SOLR-11072: Implement trigger for searchRate event type.

This commit is contained in:
Andrzej Bialecki 2017-09-07 12:30:56 +02:00
parent a73ffff8e3
commit 8ef83bff12
17 changed files with 720 additions and 227 deletions

View File

@ -98,6 +98,8 @@ New Features
* SOLR-11244: Query DSL for Solr (Cao Manh Dat)
* SOLR-11072: Implement trigger for searchRate event type. (ab)
Bug Fixes
----------------------

View File

@ -20,15 +20,20 @@ package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.core.CoreContainer;
import org.apache.zookeeper.KeeperException;
@ -102,6 +107,35 @@ public class ComputePlanAction extends TriggerActionBase {
.hint(Policy.Suggester.Hint.SRC_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
log.debug("Created suggester with srcNode: {}", event.getProperty(TriggerEvent.NODE_NAMES));
break;
case SEARCHRATE:
Map<String, Map<String, Double>> hotShards = (Map<String, Map<String, Double>>)event.getProperty(AutoScalingParams.SHARD);
Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
List<ReplicaInfo> hotReplicas = (List<ReplicaInfo>)event.getProperty(AutoScalingParams.REPLICA);
Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
if (hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
// node -> MOVEREPLICA
if (hotNodes.isEmpty()) {
log.warn("Neither hot replicas / collection nor nodes are reported in event: " + event);
return NoneSuggester.INSTANCE;
}
suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA);
for (String node : hotNodes.keySet()) {
suggester = suggester.hint(Policy.Suggester.Hint.SRC_NODE, node);
}
} else {
// collection || shard || replica -> ADDREPLICA
suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA);
Set<String> collections = new HashSet<>();
// XXX improve this when AddReplicaSuggester supports coll_shard hint
hotReplicas.forEach(r -> collections.add(r.getCollection()));
hotShards.forEach((coll, shards) -> collections.add(coll));
hotCollections.forEach((coll, rate) -> collections.add(coll));
for (String coll : collections) {
suggester = suggester.hint(Policy.Suggester.Hint.COLL, coll);
}
}
break;
default:
throw new UnsupportedOperationException("No support for events other than nodeAdded and nodeLost, received: " + event.getEventType());
}

View File

@ -17,23 +17,17 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
@ -49,57 +43,26 @@ import org.slf4j.LoggerFactory;
public class NodeAddedTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String name;
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
private final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
private final TriggerEventType eventType;
private final TimeSource timeSource;
private boolean isClosed = false;
private Set<String> lastLiveNodes;
private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
public NodeAddedTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
super(container.getZkController().getZkClient());
this.name = name;
this.properties = properties;
super(TriggerEventType.NODEADDED, name, properties, container.getResourceLoader(), container.getZkController().getZkClient());
this.container = container;
this.timeSource = TimeSource.CURRENT_TIME;
this.processorRef = new AtomicReference<>();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
actions = new ArrayList<>(3);
for (Map<String, String> map : o) {
TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class);
actions.add(action);
}
} else {
actions = Collections.emptyList();
}
lastLiveNodes = new HashSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = Boolean.parseBoolean(String.valueOf(properties.getOrDefault("enabled", "true")));
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties);
}
@Override
public void init() {
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
for (int i = 0; i < o.size(); i++) {
Map<String, String> map = o.get(i);
actions.get(i).init(map);
}
}
super.init();
// pick up added nodes for which marker paths were created
try {
List<String> added = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
@ -119,69 +82,6 @@ public class NodeAddedTrigger extends TriggerBase {
}
@Override
public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
processorRef.set(processor);
}
@Override
public AutoScaling.TriggerEventProcessor getProcessor() {
return processorRef.get();
}
@Override
public String getName() {
return name;
}
@Override
public TriggerEventType getEventType() {
return eventType;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public int getWaitForSecond() {
return waitForSecond;
}
@Override
public Map<String, Object> getProperties() {
return properties;
}
@Override
public List<TriggerAction> getActions() {
return actions;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof NodeAddedTrigger) {
NodeAddedTrigger that = (NodeAddedTrigger) obj;
return this.name.equals(that.name)
&& this.properties.equals(that.properties);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(name, properties);
}
@Override
public void close() throws IOException {
synchronized (this) {
isClosed = true;
IOUtils.closeWhileHandlingException(actions);
}
}
@Override
public void restoreState(AutoScaling.Trigger old) {
assert old.isClosed();
@ -298,13 +198,6 @@ public class NodeAddedTrigger extends TriggerBase {
}
@Override
public boolean isClosed() {
synchronized (this) {
return isClosed;
}
}
public static class NodeAddedEvent extends TriggerEvent {
public NodeAddedEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames) {

View File

@ -17,23 +17,17 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
@ -49,14 +43,7 @@ import org.slf4j.LoggerFactory;
public class NodeLostTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String name;
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
private final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
private final TriggerEventType eventType;
private final TimeSource timeSource;
private boolean isClosed = false;
@ -67,27 +54,11 @@ public class NodeLostTrigger extends TriggerBase {
public NodeLostTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
super(container.getZkController().getZkClient());
this.name = name;
this.properties = properties;
super(TriggerEventType.NODELOST, name, properties, container.getResourceLoader(), container.getZkController().getZkClient());
this.container = container;
this.timeSource = TimeSource.CURRENT_TIME;
this.processorRef = new AtomicReference<>();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
actions = new ArrayList<>(3);
for (Map<String, String> map : o) {
TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class);
actions.add(action);
}
} else {
actions = Collections.emptyList();
}
lastLiveNodes = new HashSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = Boolean.parseBoolean(String.valueOf(properties.getOrDefault("enabled", "true")));
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
}
@Override
@ -117,69 +88,6 @@ public class NodeLostTrigger extends TriggerBase {
}
}
@Override
public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
processorRef.set(processor);
}
@Override
public AutoScaling.TriggerEventProcessor getProcessor() {
return processorRef.get();
}
@Override
public String getName() {
return name;
}
@Override
public TriggerEventType getEventType() {
return eventType;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public int getWaitForSecond() {
return waitForSecond;
}
@Override
public Map<String, Object> getProperties() {
return properties;
}
@Override
public List<TriggerAction> getActions() {
return actions;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof NodeLostTrigger) {
NodeLostTrigger that = (NodeLostTrigger) obj;
return this.name.equals(that.name)
&& this.properties.equals(that.properties);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(name, properties);
}
@Override
public void close() throws IOException {
synchronized (this) {
isClosed = true;
IOUtils.closeWhileHandlingException(actions);
}
}
@Override
public void restoreState(AutoScaling.Trigger old) {
assert old.isClosed();
@ -296,13 +204,6 @@ public class NodeLostTrigger extends TriggerBase {
}
}
@Override
public boolean isClosed() {
synchronized (this) {
return isClosed;
}
}
public static class NodeLostEvent extends TriggerEvent {
public NodeLostEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames) {

View File

@ -0,0 +1,277 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.metrics.SolrCoreMetricManager;
import org.apache.solr.util.TimeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Trigger for the {@link org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType#SEARCHRATE} event.
*/
public class SearchRateTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final TimeSource timeSource;
private final CoreContainer container;
private final String handler;
private final String collection;
private final String shard;
private final String node;
private final double rate;
private final Map<String, Long> lastCollectionEvent = new ConcurrentHashMap<>();
private final Map<String, Long> lastNodeEvent = new ConcurrentHashMap<>();
private final Map<String, Long> lastShardEvent = new ConcurrentHashMap<>();
private final Map<String, Long> lastReplicaEvent = new ConcurrentHashMap<>();
private final Map<String, Object> state = new HashMap<>();
public SearchRateTrigger(String name, Map<String, Object> properties, CoreContainer container) {
super(TriggerEventType.SEARCHRATE, name, properties, container.getResourceLoader(), container.getZkController().getZkClient());
this.timeSource = TimeSource.CURRENT_TIME;
this.container = container;
this.state.put("lastCollectionEvent", lastCollectionEvent);
this.state.put("lastNodeEvent", lastNodeEvent);
this.state.put("lastShardEvent", lastShardEvent);
this.state.put("lastReplicaEvent", lastReplicaEvent);
// parse config options
collection = (String)properties.getOrDefault(AutoScalingParams.COLLECTION, Policy.ANY);
shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY);
if (collection.equals(Policy.ANY) && !shard.equals(Policy.ANY)) {
throw new IllegalArgumentException("When 'shard' is other than #ANY collection name must be also other than #ANY");
}
node = (String)properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY);
handler = (String)properties.getOrDefault(AutoScalingParams.HANDLER, "/select");
if (properties.get("rate") == null) {
throw new IllegalArgumentException("No 'rate' specified in configuration");
}
String rateString = String.valueOf(properties.get("rate"));
try {
rate = Double.parseDouble(rateString);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid 'rate' configuration value: '" + rateString + "'", e);
}
}
@Override
protected Map<String, Object> getState() {
return state;
}
@Override
protected void setState(Map<String, Object> state) {
lastCollectionEvent.clear();
lastNodeEvent.clear();
lastShardEvent.clear();
lastReplicaEvent.clear();
Map<String, Long> collTimes = (Map<String, Long>)state.get("lastCollectionEvent");
if (collTimes != null) {
lastCollectionEvent.putAll(collTimes);
}
Map<String, Long> nodeTimes = (Map<String, Long>)state.get("lastNodeEvent");
if (nodeTimes != null) {
lastNodeEvent.putAll(nodeTimes);
}
Map<String, Long> shardTimes = (Map<String, Long>)state.get("lastShardEvent");
if (shardTimes != null) {
lastShardEvent.putAll(shardTimes);
}
Map<String, Long> replicaTimes = (Map<String, Long>)state.get("lastReplicaEvent");
if (replicaTimes != null) {
lastReplicaEvent.putAll(replicaTimes);
}
}
@Override
public void restoreState(AutoScaling.Trigger old) {
assert old.isClosed();
if (old instanceof SearchRateTrigger) {
SearchRateTrigger that = (SearchRateTrigger)old;
assert this.name.equals(that.name);
this.lastCollectionEvent.clear();
this.lastNodeEvent.clear();
this.lastShardEvent.clear();
this.lastReplicaEvent.clear();
this.lastCollectionEvent.putAll(that.lastCollectionEvent);
this.lastNodeEvent.putAll(that.lastNodeEvent);
this.lastShardEvent.putAll(that.lastShardEvent);
this.lastReplicaEvent.putAll(that.lastReplicaEvent);
} else {
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"Unable to restore state from an unknown type of trigger");
}
}
@Override
public void run() {
AutoScaling.TriggerEventProcessor processor = processorRef.get();
if (processor == null) {
return;
}
Map<String, Map<String, List<ReplicaInfo>>> collectionRates = new HashMap<>();
Map<String, AtomicDouble> nodeRates = new HashMap<>();
try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder()
.withClusterStateProvider(new ZkClientClusterStateProvider(container.getZkController().getZkStateReader()))
.build()) {
SolrClientDataProvider dataProvider = new SolrClientDataProvider(cloudSolrClient);
for (String node : dataProvider.getNodes()) {
Map<String, ReplicaInfo> metricTags = new HashMap<>();
// coll, shard, replica
Map<String, Map<String, List<ReplicaInfo>>> infos = dataProvider.getReplicaInfo(node, Collections.emptyList());
infos.forEach((coll, shards) -> {
shards.forEach((sh, replicas) -> {
replicas.forEach(replica -> {
// we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
String replicaName = SolrCoreMetricManager.parseReplicaName(coll, replica.getCore());
if (replicaName == null) { // should never happen???
replicaName = replica.getName(); // which is actually coreNode name...
}
String registry = SolrCoreMetricManager.createRegistryName(true, coll, sh, replicaName, null);
String tag = "metrics:" + registry
+ ":QUERY." + handler + ".requestTimes:1minRate";
metricTags.put(tag, replica);
});
});
});
Map<String, Object> rates = dataProvider.getNodeValues(node, metricTags.keySet());
rates.forEach((tag, rate) -> {
ReplicaInfo info = metricTags.get(tag);
if (info == null) {
log.warn("Missing replica info for response tag " + tag);
} else {
Map<String, List<ReplicaInfo>> perCollection = collectionRates.computeIfAbsent(info.getCollection(), s -> new HashMap<>());
List<ReplicaInfo> perShard = perCollection.computeIfAbsent(info.getShard(), s -> new ArrayList<>());
info.getVariables().put(AutoScalingParams.RATE, rate);
perShard.add(info);
AtomicDouble perNode = nodeRates.computeIfAbsent(node, s -> new AtomicDouble());
perNode.addAndGet((Double)rate);
}
});
}
} catch (IOException e) {
log.warn("Exception getting node values", e);
return;
}
long now = timeSource.getTime();
// check for exceeded rates and filter out those with less than waitFor from previous events
Map<String, Double> hotNodes = nodeRates.entrySet().stream()
.filter(entry -> node.equals(Policy.ANY) || node.equals(entry.getKey()))
.filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent))
.filter(entry -> entry.getValue().get() > rate)
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get()));
Map<String, Map<String, Double>> hotShards = new HashMap<>();
List<ReplicaInfo> hotReplicas = new ArrayList<>();
collectionRates.forEach((coll, shardRates) -> {
shardRates.forEach((sh, replicaRates) -> {
double shardRate = replicaRates.stream()
.map(r -> {
if (waitForElapsed(r.getCollection() + "." + r.getCore(), now, lastReplicaEvent) &&
((Double)r.getVariable(AutoScalingParams.RATE) > rate)) {
hotReplicas.add(r);
}
return r;
})
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum();
if (waitForElapsed(coll + "." + sh, now, lastShardEvent) &&
(shardRate > rate) &&
(collection.equals(Policy.ANY) || collection.equals(coll)) &&
(shard.equals(Policy.ANY) || shard.equals(sh))) {
hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
}
});
});
Map<String, Double> hotCollections = new HashMap<>();
collectionRates.forEach((coll, shardRates) -> {
double total = shardRates.entrySet().stream()
.mapToDouble(e -> e.getValue().stream()
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum();
if (waitForElapsed(coll, now, lastCollectionEvent) &&
(total > rate) &&
(collection.equals(Policy.ANY) || collection.equals(coll))) {
hotCollections.put(coll, total);
}
});
if (hotCollections.isEmpty() && hotShards.isEmpty() && hotReplicas.isEmpty() && hotNodes.isEmpty()) {
return;
}
// generate event
if (processor.process(new SearchRateEvent(getName(), now, hotNodes, hotCollections, hotShards, hotReplicas))) {
// update lastEvent times
hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
hotShards.entrySet().forEach(e -> e.getValue()
.forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
}
}
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
log.info("name=" + name + ", lastTime=" + lastTime + ", elapsed=" + elapsed);
if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) {
return false;
}
return true;
}
public static class SearchRateEvent extends TriggerEvent {
public SearchRateEvent(String source, long eventTime, Map<String, Double> hotNodes,
Map<String, Double> hotCollections,
Map<String, Map<String, Double>> hotShards, List<ReplicaInfo> hotReplicas) {
super(TriggerEventType.SEARCHRATE, source, eventTime, null);
properties.put(AutoScalingParams.COLLECTION, hotCollections);
properties.put(AutoScalingParams.SHARD, hotShards);
properties.put(AutoScalingParams.REPLICA, hotReplicas);
properties.put(AutoScalingParams.NODE, hotNodes);
}
}
}

View File

@ -16,14 +16,25 @@
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@ -37,19 +48,129 @@ import org.slf4j.LoggerFactory;
public abstract class TriggerBase implements AutoScaling.Trigger {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final String name;
protected final Map<String, Object> properties = new HashMap<>();
protected final TriggerEventType eventType;
protected final int waitForSecond;
protected SolrZkClient zkClient;
protected Map<String,Object> lastState;
protected final List<TriggerAction> actions;
protected final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef = new AtomicReference<>();
protected final boolean enabled;
protected boolean isClosed;
protected TriggerBase(SolrZkClient zkClient) {
protected TriggerBase(TriggerEventType eventType, String name, Map<String, Object> properties, SolrResourceLoader loader, SolrZkClient zkClient) {
this.eventType = eventType;
this.name = name;
this.zkClient = zkClient;
if (properties != null) {
this.properties.putAll(properties);
}
this.enabled = Boolean.parseBoolean(String.valueOf(this.properties.getOrDefault("enabled", "true")));
this.waitForSecond = ((Number) this.properties.getOrDefault("waitFor", -1L)).intValue();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
actions = new ArrayList<>(3);
for (Map<String, String> map : o) {
TriggerAction action = loader.newInstance(map.get("class"), TriggerAction.class);
actions.add(action);
}
} else {
actions = Collections.emptyList();
}
try {
if (!zkClient.exists(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, true)) {
zkClient.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, false, true);
}
} catch (KeeperException | InterruptedException e) {
LOG.warn("Exception checking ZK path " + ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, e);
}
}
@Override
public void init() {
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
for (int i = 0; i < o.size(); i++) {
Map<String, String> map = o.get(i);
actions.get(i).init(map);
}
}
}
@Override
public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
processorRef.set(processor);
}
@Override
public AutoScaling.TriggerEventProcessor getProcessor() {
return processorRef.get();
}
@Override
public String getName() {
return name;
}
@Override
public TriggerEventType getEventType() {
return eventType;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public int getWaitForSecond() {
return waitForSecond;
}
@Override
public Map<String, Object> getProperties() {
return properties;
}
@Override
public List<TriggerAction> getActions() {
return actions;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass().equals(this.getClass())) {
TriggerBase that = (TriggerBase) obj;
return this.name.equals(that.name)
&& this.properties.equals(that.properties);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(name, properties);
}
@Override
public void close() throws IOException {
synchronized (this) {
isClosed = true;
IOUtils.closeWhileHandlingException(actions);
}
}
@Override
public boolean isClosed() {
synchronized (this) {
return isClosed;
}
}
/**
* Prepare and return internal state of this trigger in a format suitable for persisting in ZK.
* @return map of internal state properties. Note: values must be supported by {@link Utils#toJSON(Object)}.

View File

@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -139,14 +140,18 @@ public class MetricsHandler extends RequestHandlerBase implements PermissionName
continue;
}
MetricUtils.PropertyFilter propertyFilter = MetricUtils.PropertyFilter.ALL;
boolean simple = false;
if (propertyName != null) {
propertyFilter = (name) -> name.equals(propertyName);
simple = true;
// use escaped versions
key = parts[0] + ":" + parts[1];
}
MetricUtils.convertMetric(key, m, propertyFilter, false, true, true, simple, ":", (k, v) -> result.add(k, v));
MetricUtils.convertMetric(key, m, propertyFilter, false, true, true, false, ":", (k, v) -> {
if ((v instanceof Map) && propertyName != null) {
((Map)v).forEach((k1, v1) -> result.add(k + ":" + k1, v1));
} else {
result.add(k, v);
}
});
}
rsp.getValues().add("metrics", result);
if (errors.size() > 0) {

View File

@ -0,0 +1,167 @@
package org.apache.solr.cloud.autoscaling;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.junit.BeforeClass;
import org.junit.Test;
/**
*
*/
public class SearchRateTriggerTest extends SolrCloudTestCase {
private static final String PREFIX = SearchRateTriggerTest.class.getSimpleName() + "-";
private static final String COLL1 = PREFIX + "collection1";
private static final String COLL2 = PREFIX + "collection2";
private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
fail("Did not expect the listener to fire on first run!");
return true;
};
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
.addConfig("conf", configset("cloud-minimal"))
.configure();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
"conf", 2, 2);
CloudSolrClient solrClient = cluster.getSolrClient();
create.setMaxShardsPerNode(1);
create.process(solrClient);
create = CollectionAdminRequest.createCollection(COLL2,
"conf", 2, 2);
create.setMaxShardsPerNode(1);
create.process(solrClient);
}
@Test
public void testTrigger() throws Exception {
double rate = 1.0;
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
URL baseUrl = cluster.getJettySolrRunners().get(1).getBaseUrl();
long waitForSeconds = 5 + random().nextInt(5);
Map<String, Object> props = createTriggerProps(waitForSeconds, rate);
final List<TriggerEvent> events = new ArrayList<>();
CloudSolrClient solrClient = cluster.getSolrClient();
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger", props, container)) {
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
trigger.setProcessor(event -> events.add(event));
// generate replica traffic
String coreName = container.getLoadedCoreNames().iterator().next();
String url = baseUrl.toString() + "/" + coreName;
try (HttpSolrClient simpleClient = new HttpSolrClient.Builder(url).build()) {
SolrParams query = params(CommonParams.Q, "*:*", CommonParams.DISTRIB, "false");
for (int i = 0; i < 200; i++) {
simpleClient.query(query);
}
trigger.run();
// waitFor delay
assertEquals(0, events.size());
Thread.sleep(waitForSeconds * 1000 + 2000);
// should generate replica event
trigger.run();
assertEquals(1, events.size());
TriggerEvent event = events.get(0);
assertEquals(TriggerEventType.SEARCHRATE, event.eventType);
List<ReplicaInfo> infos = (List<ReplicaInfo>)event.getProperty(AutoScalingParams.REPLICA);
assertEquals(1, infos.size());
ReplicaInfo info = infos.get(0);
assertEquals(coreName, info.getCore());
assertTrue((Double)info.getVariable(AutoScalingParams.RATE) > rate);
}
// close that jetty to remove the violation - alternatively wait for 1 min...
cluster.stopJettySolrRunner(1);
events.clear();
SolrParams query = params(CommonParams.Q, "*:*");
for (int i = 0; i < 500; i++) {
solrClient.query(COLL1, query);
}
Thread.sleep(waitForSeconds * 1000 + 2000);
trigger.run();
// should generate collection event
assertEquals(1, events.size());
TriggerEvent event = events.get(0);
Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
assertEquals(1, hotCollections.size());
Double Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
assertTrue(Rate > rate);
events.clear();
for (int i = 0; i < 1000; i++) {
solrClient.query(COLL2, query);
}
Thread.sleep(waitForSeconds * 1000 + 2000);
trigger.run();
// should generate node and collection event but not for COLL2 because of waitFor
assertEquals(1, events.size());
event = events.get(0);
Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
assertEquals(3, hotNodes.size());
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
assertEquals(2, hotCollections.size());
Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
Rate = hotCollections.get(COLL2);
assertNotNull(Rate);
events.clear();
// assert that waitFor prevents new events from being generated
trigger.run();
// should not generate any events
assertEquals(0, events.size());
Thread.sleep(waitForSeconds * 1000 + 2000);
trigger.run();
// should generate node and collection event
assertEquals(1, events.size());
hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
assertEquals(2, hotCollections.size());
Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
Rate = hotCollections.get(COLL2);
assertNotNull(Rate);
hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
assertEquals(3, hotNodes.size());
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
}
}
private Map<String, Object> createTriggerProps(long waitForSeconds, double rate) {
Map<String, Object> props = new HashMap<>();
props.put("rate", rate);
props.put("event", "searchRate");
props.put("waitFor", waitForSeconds);
props.put("enabled", true);
List<Map<String, String>> actions = new ArrayList<>(3);
Map<String, String> map = new HashMap<>(2);
map.put("name", "compute_plan");
map.put("class", "solr.ComputePlanAction");
actions.add(map);
map = new HashMap<>(2);
map.put("name", "execute_plan");
map.put("class", "solr.ExecutePlanAction");
actions.add(map);
props.put("actions", actions);
return props;
}
}

View File

@ -277,6 +277,15 @@ public class MetricsHandlerTest extends SolrTestCaseJ4 {
val = values.findRecursive("metrics", key3);
assertNotNull(val);
String key4 = "solr.core.collection1:QUERY./select.requestTimes:1minRate";
resp = new SolrQueryResponse();
handler.handleRequestBody(req(CommonParams.QT, "/admin/metrics", CommonParams.WT, "json",
MetricsHandler.KEY_PARAM, key4), resp);
values = resp.getValues();
val = values.findRecursive("metrics", key4);
assertNotNull(val);
assertTrue(val instanceof Number);
// test errors
// invalid keys

View File

@ -23,6 +23,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.solr.common.cloud.ClusterState;
public interface ClusterDataProvider extends Closeable {
/**Get the value of each tag for a given node
*
@ -42,6 +44,8 @@ public interface ClusterDataProvider extends Closeable {
Collection<String> getNodes();
ClusterState getClusterState();
/**Get the collection-specific policy
*/
String getPolicyNameByCollection(String coll);

View File

@ -19,7 +19,9 @@ package org.apache.solr.client.solrj.cloud.autoscaling;
import org.apache.solr.client.solrj.SolrRequest;
public class NoneSuggester extends Policy.Suggester{
public class NoneSuggester extends Policy.Suggester {
public static final NoneSuggester INSTANCE = new NoneSuggester();
@Override
SolrRequest init() {
return null;

View File

@ -203,7 +203,7 @@ public class Policy implements MapWriter {
}
Session(ClusterDataProvider dataProvider) {
this.nodes = new ArrayList<>(dataProvider.getNodes());
this.nodes = new ArrayList<>(dataProvider.getClusterState().getLiveNodes());
this.dataProvider = dataProvider;
for (String node : nodes) {
collections.addAll(dataProvider.getReplicaInfo(node, Collections.emptyList()).keySet());

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester.Hint;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.util.Pair;
@ -66,6 +67,11 @@ public class PolicyHelper {
return delegate.getNodes();
}
@Override
public ClusterState getClusterState() {
return delegate.getClusterState();
}
@Override
public String getPolicyNameByCollection(String coll) {
return policyMapping.get() != null && policyMapping.get().containsKey(coll) ?

View File

@ -36,6 +36,7 @@ public class ReplicaInfo implements MapWriter {
this.collection = coll;
this.shard = shard;
this.type = type;
this.core = (String)vals.get("core");
}
@Override
@ -54,6 +55,10 @@ public class ReplicaInfo implements MapWriter {
return core;
}
public String getName() {
return name;
}
public String getCollection() {
return collection;
}
@ -61,4 +66,23 @@ public class ReplicaInfo implements MapWriter {
public String getShard() {
return shard;
}
public Map<String, Object> getVariables() {
return variables;
}
public Object getVariable(String name) {
return variables != null ? variables.get(name) : null;
}
@Override
public String toString() {
return "ReplicaInfo{" +
"name='" + name + '\'' +
", collection='" + collection + '\'' +
", shard='" + shard + '\'' +
", type=" + type +
", variables=" + variables +
'}';
}
}

View File

@ -64,15 +64,14 @@ public class SolrClientDataProvider implements ClusterDataProvider, MapWriter {
private final CloudSolrClient solrClient;
private final Map<String, Map<String, Map<String, List<ReplicaInfo>>>> data = new HashMap<>();
private Set<String> liveNodes;
private ClusterState clusterState;
private Map<String, Object> snitchSession = new HashMap<>();
private Map<String, Map> nodeVsTags = new HashMap<>();
public SolrClientDataProvider(CloudSolrClient solrClient) {
this.solrClient = solrClient;
ZkStateReader zkStateReader = solrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
this.liveNodes = clusterState.getLiveNodes();
clusterState = zkStateReader.getClusterState();
Map<String, ClusterState.CollectionRef> all = clusterState.getCollectionStates();
all.forEach((collName, ref) -> {
DocCollection coll = ref.get();
@ -81,11 +80,21 @@ public class SolrClientDataProvider implements ClusterDataProvider, MapWriter {
Map<String, Map<String, List<ReplicaInfo>>> nodeData = data.computeIfAbsent(replica.getNodeName(), k -> new HashMap<>());
Map<String, List<ReplicaInfo>> collData = nodeData.computeIfAbsent(collName, k -> new HashMap<>());
List<ReplicaInfo> replicas = collData.computeIfAbsent(shard, k -> new ArrayList<>());
replicas.add(new ReplicaInfo(replica.getName(), collName, shard, replica.getType(), new HashMap<>()));
replicas.add(new ReplicaInfo(replica.getName(), collName, shard, replica.getType(), replica.getProperties()));
});
});
}
@Override
public Collection<String> getNodes() {
return clusterState.getLiveNodes();
}
@Override
public ClusterState getClusterState() {
return clusterState;
}
@Override
public String getPolicyNameByCollection(String coll) {
ClusterState.CollectionRef state = solrClient.getClusterStateProvider().getState(coll);
@ -106,14 +115,9 @@ public class SolrClientDataProvider implements ClusterDataProvider, MapWriter {
return data.computeIfAbsent(node, s -> Collections.emptyMap());//todo fill other details
}
@Override
public Collection<String> getNodes() {
return liveNodes;
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put("liveNodes", liveNodes);
ew.put("clusterState", clusterState);
ew.put("replicaInfo", Utils.getDeepCopy(data, 5));
ew.put("nodeValues", nodeVsTags);

View File

@ -37,6 +37,12 @@ public interface AutoScalingParams {
String BEFORE_ACTION = "beforeAction";
String AFTER_ACTION = "afterAction";
String TIMEOUT = "timeout";
String COLLECTION = "collection";
String SHARD = "shard";
String REPLICA = "replica";
String NODE = "node";
String HANDLER = "handler";
String RATE = "rate";
String REMOVE_LISTENERS = "removeListeners";
String ZK_VERSION = "zkVersion";

View File

@ -34,6 +34,7 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause.Violation;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester.Hint;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkStateReader;
@ -438,6 +439,11 @@ public class TestPolicy extends SolrTestCaseJ4 {
return (Collection<String>) m.get("liveNodes");
}
@Override
public ClusterState getClusterState() {
throw new UnsupportedOperationException("getClusterState");
}
@Override
public String getPolicyNameByCollection(String coll) {
return null;
@ -968,6 +974,11 @@ public class TestPolicy extends SolrTestCaseJ4 {
return clusterDataProvider.getNodes();
}
@Override
public ClusterState getClusterState() {
return clusterDataProvider.getClusterState();
}
@Override
public String getPolicyNameByCollection(String coll) {
return null;
@ -1043,6 +1054,11 @@ public class TestPolicy extends SolrTestCaseJ4 {
return replicaInfoMap.keySet();
}
@Override
public ClusterState getClusterState() {
throw new UnsupportedOperationException("getClusterState");
}
@Override
public String getPolicyNameByCollection(String coll) {
return null;
@ -1101,6 +1117,11 @@ public class TestPolicy extends SolrTestCaseJ4 {
return clusterDataProvider.getNodes();
}
@Override
public ClusterState getClusterState() {
return clusterDataProvider.getClusterState();
}
@Override
public String getPolicyNameByCollection(String coll) {
return "p1";
@ -1129,6 +1150,11 @@ public class TestPolicy extends SolrTestCaseJ4 {
return nodeValues.keySet();
}
@Override
public ClusterState getClusterState() {
throw new UnsupportedOperationException("getClusterState");
}
@Override
public String getPolicyNameByCollection(String coll) {
return null;
@ -1175,6 +1201,12 @@ public class TestPolicy extends SolrTestCaseJ4 {
public Collection<String> getNodes() {
return Arrays.asList( "127.0.0.1:50097_solr", "127.0.0.1:50096_solr");
}
@Override
public ClusterState getClusterState() {
throw new UnsupportedOperationException("getClusterState");
}
};
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations(
"newColl", new AutoScalingConfig((Map<String, Object>)Utils.fromJSONString(autoScaleJson)),
@ -1232,6 +1264,12 @@ public class TestPolicy extends SolrTestCaseJ4 {
public Collection<String> getNodes() {
return Arrays.asList("node1", "node2", "node3", "node4");
}
@Override
public ClusterState getClusterState() {
throw new UnsupportedOperationException("getClusterState");
}
};
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations(
"newColl", new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScaleJson)),