SOLR-12181: Add trigger based on document count / index size.

This commit is contained in:
Andrzej Bialecki 2018-04-11 12:35:31 +02:00
parent e99a19755c
commit 376f6c4946
26 changed files with 1633 additions and 131 deletions

View File

@ -88,6 +88,8 @@ New Features
* SOLR-12151: Add abstract MultiSolrCloudTestCase class. (Christine Poerschke) * SOLR-12151: Add abstract MultiSolrCloudTestCase class. (Christine Poerschke)
* SOLR-12181: Add index size autoscaling trigger, based on document count or size in bytes. (ab)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -33,7 +33,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
public class AutoAddReplicasPlanAction extends ComputePlanAction { public class AutoAddReplicasPlanAction extends ComputePlanAction {
@Override @Override
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) { protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) {
// for backward compatibility // for backward compatibility
ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider(); ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider();
String autoAddReplicas = stateProvider.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null); String autoAddReplicas = stateProvider.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
@ -41,7 +41,7 @@ public class AutoAddReplicasPlanAction extends ComputePlanAction {
return NoneSuggester.get(session); return NoneSuggester.get(session);
} }
Suggester suggester = super.getSuggester(session, event, cloudManager); Suggester suggester = super.getSuggester(session, event, context, cloudManager);
ClusterState clusterState; ClusterState clusterState;
try { try {
clusterState = stateProvider.getClusterState(); clusterState = stateProvider.getClusterState();

View File

@ -180,6 +180,9 @@ public class AutoScaling {
case SCHEDULED: case SCHEDULED:
t = new ScheduledTrigger(name); t = new ScheduledTrigger(name);
break; break;
case INDEXSIZE:
t = new IndexSizeTrigger(name);
break;
default: default:
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name); throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
} }

View File

@ -34,6 +34,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.UnsupportedSuggester;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.params.AutoScalingParams; import org.apache.solr.common.params.AutoScalingParams;
@ -88,7 +89,7 @@ public class ComputePlanAction extends TriggerActionBase {
log.trace("-- state: {}", clusterState); log.trace("-- state: {}", clusterState);
} }
try { try {
Suggester intialSuggester = getSuggester(session, event, cloudManager); Suggester intialSuggester = getSuggester(session, event, context, cloudManager);
Suggester suggester = intialSuggester; Suggester suggester = intialSuggester;
int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState); int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState);
int requestedOperations = getRequestedNumOps(event); int requestedOperations = getRequestedNumOps(event);
@ -112,7 +113,7 @@ public class ComputePlanAction extends TriggerActionBase {
if (suggester.getSession() != null) { if (suggester.getSession() != null) {
session = suggester.getSession(); session = suggester.getSession();
} }
suggester = getSuggester(session, event, cloudManager); suggester = getSuggester(session, event, context, cloudManager);
// break on first null op // break on first null op
// unless a specific number of ops was requested // unless a specific number of ops was requested
@ -190,7 +191,7 @@ public class ComputePlanAction extends TriggerActionBase {
private static final String START = "__start__"; private static final String START = "__start__";
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) { protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) {
Suggester suggester; Suggester suggester;
switch (event.getEventType()) { switch (event.getEventType()) {
case NODEADDED: case NODEADDED:
@ -203,6 +204,7 @@ public class ComputePlanAction extends TriggerActionBase {
break; break;
case SEARCHRATE: case SEARCHRATE:
case METRIC: case METRIC:
case INDEXSIZE:
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList()); List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList());
int start = (Integer)event.getProperty(START, 0); int start = (Integer)event.getProperty(START, 0);
if (ops.isEmpty() || start >= ops.size()) { if (ops.isEmpty() || start >= ops.size()) {
@ -210,14 +212,15 @@ public class ComputePlanAction extends TriggerActionBase {
} }
TriggerEvent.Op op = ops.get(start); TriggerEvent.Op op = ops.get(start);
suggester = session.getSuggester(op.getAction()); suggester = session.getSuggester(op.getAction());
if (suggester instanceof UnsupportedSuggester) {
List<TriggerEvent.Op> unsupportedOps = (List<TriggerEvent.Op>)context.getProperties().computeIfAbsent("unsupportedOps", k -> new ArrayList<TriggerEvent.Op>());
unsupportedOps.add(op);
}
for (Map.Entry<Suggester.Hint, Object> e : op.getHints().entrySet()) { for (Map.Entry<Suggester.Hint, Object> e : op.getHints().entrySet()) {
suggester = suggester.hint(e.getKey(), e.getValue()); suggester = suggester.hint(e.getKey(), e.getValue());
} }
if (++start >= ops.size()) { start++;
event.getProperties().remove(START);
} else {
event.getProperties().put(START, start); event.getProperties().put(START, start);
}
break; break;
case SCHEDULED: case SCHEDULED:
String preferredOp = (String) event.getProperty(AutoScalingParams.PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower()); String preferredOp = (String) event.getProperty(AutoScalingParams.PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
@ -225,7 +228,7 @@ public class ComputePlanAction extends TriggerActionBase {
suggester = session.getSuggester(action); suggester = session.getSuggester(action);
break; break;
default: default:
throw new UnsupportedOperationException("No support for events other than nodeAdded, nodeLost, searchRate and metric. Received: " + event.getEventType()); throw new UnsupportedOperationException("No support for events other than nodeAdded, nodeLost, searchRate, metric and indexSize. Received: " + event.getEventType());
} }
return suggester; return suggester;
} }

View File

@ -0,0 +1,408 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.metrics.SolrCoreMetricManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class IndexSizeTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String ABOVE_BYTES_PROP = "aboveBytes";
public static final String ABOVE_DOCS_PROP = "aboveDocs";
public static final String ABOVE_OP_PROP = "aboveOp";
public static final String BELOW_BYTES_PROP = "belowBytes";
public static final String BELOW_DOCS_PROP = "belowDocs";
public static final String BELOW_OP_PROP = "belowOp";
public static final String COLLECTIONS_PROP = "collections";
public static final String BYTES_SIZE_PROP = "__bytes__";
public static final String DOCS_SIZE_PROP = "__docs__";
public static final String ABOVE_SIZE_PROP = "aboveSize";
public static final String BELOW_SIZE_PROP = "belowSize";
public static final String VIOLATION_PROP = "violationType";
public enum Unit { bytes, docs }
private long aboveBytes, aboveDocs, belowBytes, belowDocs;
private CollectionParams.CollectionAction aboveOp, belowOp;
private final Set<String> collections = new HashSet<>();
private final Map<String, Long> lastEventMap = new ConcurrentHashMap<>();
public IndexSizeTrigger(String name) {
super(TriggerEventType.INDEXSIZE, name);
TriggerUtils.validProperties(validProperties,
ABOVE_BYTES_PROP, ABOVE_DOCS_PROP, BELOW_BYTES_PROP, BELOW_DOCS_PROP, COLLECTIONS_PROP);
}
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
super.configure(loader, cloudManager, properties);
String aboveStr = String.valueOf(properties.getOrDefault(ABOVE_BYTES_PROP, Long.MAX_VALUE));
String belowStr = String.valueOf(properties.getOrDefault(BELOW_BYTES_PROP, -1));
try {
aboveBytes = Long.parseLong(aboveStr);
if (aboveBytes <= 0) {
throw new Exception("value must be > 0");
}
} catch (Exception e) {
throw new TriggerValidationException(getName(), ABOVE_BYTES_PROP, "invalid value '" + aboveStr + "': " + e.toString());
}
try {
belowBytes = Long.parseLong(belowStr);
if (belowBytes < 0) {
belowBytes = -1;
}
} catch (Exception e) {
throw new TriggerValidationException(getName(), BELOW_BYTES_PROP, "invalid value '" + belowStr + "': " + e.toString());
}
// below must be at least 2x smaller than above, otherwise splitting a shard
// would immediately put the shard below the threshold and cause the mergeshards action
if (belowBytes > 0 && (belowBytes * 2 > aboveBytes)) {
throw new TriggerValidationException(getName(), BELOW_BYTES_PROP,
"invalid value " + belowBytes + ", should be less than half of '" + ABOVE_BYTES_PROP + "' value, which is " + aboveBytes);
}
// do the same for docs bounds
aboveStr = String.valueOf(properties.getOrDefault(ABOVE_DOCS_PROP, Long.MAX_VALUE));
belowStr = String.valueOf(properties.getOrDefault(BELOW_DOCS_PROP, -1));
try {
aboveDocs = Long.parseLong(aboveStr);
if (aboveDocs <= 0) {
throw new Exception("value must be > 0");
}
} catch (Exception e) {
throw new TriggerValidationException(getName(), ABOVE_DOCS_PROP, "invalid value '" + aboveStr + "': " + e.toString());
}
try {
belowDocs = Long.parseLong(belowStr);
if (belowDocs < 0) {
belowDocs = -1;
}
} catch (Exception e) {
throw new TriggerValidationException(getName(), BELOW_DOCS_PROP, "invalid value '" + belowStr + "': " + e.toString());
}
// below must be at least 2x smaller than above, otherwise splitting a shard
// would immediately put the shard below the threshold and cause the mergeshards action
if (belowDocs > 0 && (belowDocs * 2 > aboveDocs)) {
throw new TriggerValidationException(getName(), BELOW_DOCS_PROP,
"invalid value " + belowDocs + ", should be less than half of '" + ABOVE_DOCS_PROP + "' value, which is " + aboveDocs);
}
String collectionsString = (String) properties.get(COLLECTIONS_PROP);
if (collectionsString != null && !collectionsString.isEmpty()) {
collections.addAll(StrUtils.splitSmart(collectionsString, ','));
}
String aboveOpStr = String.valueOf(properties.getOrDefault(ABOVE_OP_PROP, CollectionParams.CollectionAction.SPLITSHARD.toLower()));
// TODO: this is a placeholder until SOLR-9407 is implemented
String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.MERGESHARDS.toLower()));
aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
if (aboveOp == null) {
throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value of " + ABOVE_OP_PROP + ": '" + aboveOpStr + "'");
}
belowOp = CollectionParams.CollectionAction.get(belowOpStr);
if (belowOp == null) {
throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of " + BELOW_OP_PROP + ": '" + belowOpStr + "'");
}
}
@Override
protected Map<String, Object> getState() {
Map<String, Object> state = new HashMap<>();
state.put("lastEventMap", lastEventMap);
return state;
}
@Override
protected void setState(Map<String, Object> state) {
this.lastEventMap.clear();
Map<String, Long> replicaVsTime = (Map<String, Long>)state.get("lastEventMap");
if (replicaVsTime != null) {
this.lastEventMap.putAll(replicaVsTime);
}
}
@Override
public void restoreState(AutoScaling.Trigger old) {
assert old.isClosed();
if (old instanceof IndexSizeTrigger) {
} else {
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"Unable to restore state from an unknown type of trigger");
}
}
@Override
public void run() {
synchronized(this) {
if (isClosed) {
log.warn(getName() + " ran but was already closed");
return;
}
}
AutoScaling.TriggerEventProcessor processor = processorRef.get();
if (processor == null) {
return;
}
// replica name / info + size, retrieved from leaders only
Map<String, ReplicaInfo> currentSizes = new HashMap<>();
try {
ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
for (String node : clusterState.getLiveNodes()) {
Map<String, ReplicaInfo> metricTags = new HashMap<>();
// coll, shard, replica
Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList());
infos.forEach((coll, shards) -> {
if (!collections.isEmpty() && !collections.contains(coll)) {
return;
}
DocCollection docCollection = clusterState.getCollection(coll);
shards.forEach((sh, replicas) -> {
// check only the leader of a replica in active shard
Slice s = docCollection.getSlice(sh);
if (s.getState() != Slice.State.ACTIVE) {
return;
}
Replica r = s.getLeader();
// no leader - don't do anything
if (r == null) {
return;
}
// find ReplicaInfo
ReplicaInfo info = null;
for (ReplicaInfo ri : replicas) {
if (r.getCoreName().equals(ri.getCore())) {
info = ri;
break;
}
}
if (info == null) {
// probably replica is not on this node?
return;
}
// we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
String replicaName = Utils.parseMetricsReplicaName(coll, info.getCore());
if (replicaName == null) { // should never happen???
replicaName = info.getName(); // which is actually coreNode name...
}
String registry = SolrCoreMetricManager.createRegistryName(true, coll, sh, replicaName, null);
String tag = "metrics:" + registry + ":INDEX.sizeInBytes";
metricTags.put(tag, info);
tag = "metrics:" + registry + ":SEARCHER.searcher.numDocs";
metricTags.put(tag, info);
});
});
if (metricTags.isEmpty()) {
continue;
}
Map<String, Object> sizes = cloudManager.getNodeStateProvider().getNodeValues(node, metricTags.keySet());
sizes.forEach((tag, size) -> {
final ReplicaInfo info = metricTags.get(tag);
if (info == null) {
log.warn("Missing replica info for response tag " + tag);
} else {
// verify that it's a Number
if (!(size instanceof Number)) {
log.warn("invalid size value - not a number: '" + size + "' is " + size.getClass().getName());
return;
}
ReplicaInfo currentInfo = currentSizes.computeIfAbsent(info.getCore(), k -> (ReplicaInfo)info.clone());
if (tag.contains("INDEX")) {
currentInfo.getVariables().put(BYTES_SIZE_PROP, ((Number) size).longValue());
} else {
currentInfo.getVariables().put(DOCS_SIZE_PROP, ((Number) size).longValue());
}
}
});
}
} catch (IOException e) {
log.warn("Error running trigger " + getName(), e);
return;
}
long now = cloudManager.getTimeSource().getTimeNs();
// now check thresholds
// collection / list(info)
Map<String, List<ReplicaInfo>> aboveSize = new HashMap<>();
currentSizes.entrySet().stream()
.filter(e -> (
(Long)e.getValue().getVariable(BYTES_SIZE_PROP) > aboveBytes ||
(Long)e.getValue().getVariable(DOCS_SIZE_PROP) > aboveDocs
) && waitForElapsed(e.getKey(), now, lastEventMap))
.forEach(e -> {
ReplicaInfo info = e.getValue();
List<ReplicaInfo> infos = aboveSize.computeIfAbsent(info.getCollection(), c -> new ArrayList<>());
if (!infos.contains(info)) {
if ((Long)e.getValue().getVariable(BYTES_SIZE_PROP) > aboveBytes) {
info.getVariables().put(VIOLATION_PROP, ABOVE_BYTES_PROP);
} else {
info.getVariables().put(VIOLATION_PROP, ABOVE_DOCS_PROP);
}
infos.add(info);
}
});
// collection / list(info)
Map<String, List<ReplicaInfo>> belowSize = new HashMap<>();
currentSizes.entrySet().stream()
.filter(e -> (
(Long)e.getValue().getVariable(BYTES_SIZE_PROP) < belowBytes ||
(Long)e.getValue().getVariable(DOCS_SIZE_PROP) < belowDocs
) && waitForElapsed(e.getKey(), now, lastEventMap))
.forEach(e -> {
ReplicaInfo info = e.getValue();
List<ReplicaInfo> infos = belowSize.computeIfAbsent(info.getCollection(), c -> new ArrayList<>());
if (!infos.contains(info)) {
if ((Long)e.getValue().getVariable(BYTES_SIZE_PROP) < belowBytes) {
info.getVariables().put(VIOLATION_PROP, BELOW_BYTES_PROP);
} else {
info.getVariables().put(VIOLATION_PROP, BELOW_DOCS_PROP);
}
infos.add(info);
}
});
if (aboveSize.isEmpty() && belowSize.isEmpty()) {
return;
}
// find the earliest time when a condition was exceeded
final AtomicLong eventTime = new AtomicLong(now);
// calculate ops
final List<TriggerEvent.Op> ops = new ArrayList<>();
aboveSize.forEach((coll, replicas) -> {
replicas.forEach(r -> {
TriggerEvent.Op op = new TriggerEvent.Op(aboveOp);
op.addHint(Suggester.Hint.COLL_SHARD, new Pair<>(coll, r.getShard()));
ops.add(op);
Long time = lastEventMap.get(r.getCore());
if (time != null && eventTime.get() > time) {
eventTime.set(time);
}
});
});
belowSize.forEach((coll, replicas) -> {
if (replicas.size() < 2) {
return;
}
// sort by increasing size
replicas.sort((r1, r2) -> {
// XXX this is not quite correct - if BYTES_SIZE_PROP decided that replica got here
// then we should be sorting by BYTES_SIZE_PROP. However, since DOCS and BYTES are
// loosely correlated it's simpler to sort just by docs (which better reflects the "too small"
// condition than index size, due to possibly existing deleted docs that still occupy space)
long delta = (Long) r1.getVariable(DOCS_SIZE_PROP) - (Long) r2.getVariable(DOCS_SIZE_PROP);
if (delta > 0) {
return 1;
} else if (delta < 0) {
return -1;
} else {
return 0;
}
});
// TODO: MERGESHARDS is not implemented yet. For now take the top two smallest shards
// TODO: but in the future we probably need to get ones with adjacent ranges.
// TODO: generate as many MERGESHARDS as needed to consume all belowSize shards
TriggerEvent.Op op = new TriggerEvent.Op(belowOp);
op.addHint(Suggester.Hint.COLL_SHARD, new Pair(coll, replicas.get(0).getShard()));
op.addHint(Suggester.Hint.COLL_SHARD, new Pair(coll, replicas.get(1).getShard()));
ops.add(op);
Long time = lastEventMap.get(replicas.get(0).getCore());
if (time != null && eventTime.get() > time) {
eventTime.set(time);
}
time = lastEventMap.get(replicas.get(1).getCore());
if (time != null && eventTime.get() > time) {
eventTime.set(time);
}
});
if (ops.isEmpty()) {
return;
}
if (processor.process(new IndexSizeEvent(getName(), eventTime.get(), ops, aboveSize, belowSize))) {
// update last event times
aboveSize.forEach((coll, replicas) -> {
replicas.forEach(r -> lastEventMap.put(r.getCore(), now));
});
belowSize.forEach((coll, replicas) -> {
lastEventMap.put(replicas.get(0).getCore(), now);
lastEventMap.put(replicas.get(1).getCore(), now);
});
}
}
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
log.trace("name={}, lastTime={}, elapsed={}", name, lastTime, elapsed);
if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) {
return false;
}
return true;
}
public static class IndexSizeEvent extends TriggerEvent {
public IndexSizeEvent(String source, long eventTime, List<Op> ops, Map<String, List<ReplicaInfo>> aboveSize,
Map<String, List<ReplicaInfo>> belowSize) {
super(TriggerEventType.INDEXSIZE, source, eventTime, null);
properties.put(TriggerEvent.REQUESTED_OPS, ops);
properties.put(ABOVE_SIZE_PROP, aboveSize);
properties.put(BELOW_SIZE_PROP, belowSize);
}
}
}

View File

@ -203,12 +203,12 @@ public class MetricTrigger extends TriggerBase {
List<Op> ops = new ArrayList<>(hotNodes.size()); List<Op> ops = new ArrayList<>(hotNodes.size());
for (String n : hotNodes.keySet()) { for (String n : hotNodes.keySet()) {
Op op = new Op(CollectionParams.CollectionAction.get(preferredOp)); Op op = new Op(CollectionParams.CollectionAction.get(preferredOp));
op.setHint(Suggester.Hint.SRC_NODE, n); op.addHint(Suggester.Hint.SRC_NODE, n);
if (!collection.equals(Policy.ANY)) { if (!collection.equals(Policy.ANY)) {
if (!shard.equals(Policy.ANY)) { if (!shard.equals(Policy.ANY)) {
op.setHint(Suggester.Hint.COLL_SHARD, new Pair<>(collection, shard)); op.addHint(Suggester.Hint.COLL_SHARD, new Pair<>(collection, shard));
} else { } else {
op.setHint(Suggester.Hint.COLL, collection); op.addHint(Suggester.Hint.COLL, collection);
} }
} }
ops.add(op); ops.add(op);

View File

@ -181,10 +181,11 @@ public class SearchRateTrigger extends TriggerBase {
} else { } else {
Map<String, List<ReplicaInfo>> perCollection = collectionRates.computeIfAbsent(info.getCollection(), s -> new HashMap<>()); Map<String, List<ReplicaInfo>> perCollection = collectionRates.computeIfAbsent(info.getCollection(), s -> new HashMap<>());
List<ReplicaInfo> perShard = perCollection.computeIfAbsent(info.getShard(), s -> new ArrayList<>()); List<ReplicaInfo> perShard = perCollection.computeIfAbsent(info.getShard(), s -> new ArrayList<>());
info.getVariables().put(AutoScalingParams.RATE, rate); info = (ReplicaInfo)info.clone();
info.getVariables().put(AutoScalingParams.RATE, ((Number)rate).doubleValue());
perShard.add(info); perShard.add(info);
AtomicDouble perNode = nodeRates.computeIfAbsent(node, s -> new AtomicDouble()); AtomicDouble perNode = nodeRates.computeIfAbsent(node, s -> new AtomicDouble());
perNode.addAndGet((Double)rate); perNode.addAndGet(((Number)rate).doubleValue());
} }
}); });
} }

View File

@ -17,9 +17,13 @@
package org.apache.solr.cloud.autoscaling; package org.apache.solr.cloud.autoscaling;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
@ -49,11 +53,17 @@ public class TriggerEvent implements MapWriter {
public Op(CollectionParams.CollectionAction action, Suggester.Hint hint, Object hintValue) { public Op(CollectionParams.CollectionAction action, Suggester.Hint hint, Object hintValue) {
this.action = action; this.action = action;
this.hints.put(hint, hintValue); addHint(hint, hintValue);
} }
public void setHint(Suggester.Hint hint, Object value) { public void addHint(Suggester.Hint hint, Object value) {
hints.put(hint, value); hint.validator.accept(value);
if (hint.multiValued) {
Collection<?> values = value instanceof Collection ? (Collection) value : Collections.singletonList(value);
((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).addAll(values);
} else {
hints.put(hint, value == null ? null : String.valueOf(value));
}
} }
public CollectionParams.CollectionAction getAction() { public CollectionParams.CollectionAction getAction() {

View File

@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -113,20 +114,31 @@ public class CloudTestUtils {
* number of shards and replicas * number of shards and replicas
*/ */
public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) { public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) {
return clusterShape(expectedShards, expectedReplicas, false);
}
public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas, boolean withInactive) {
return (liveNodes, collectionState) -> { return (liveNodes, collectionState) -> {
if (collectionState == null) if (collectionState == null) {
log.debug("-- null collection");
return false; return false;
if (collectionState.getSlices().size() != expectedShards) }
Collection<Slice> slices = withInactive ? collectionState.getSlices() : collectionState.getActiveSlices();
if (slices.size() != expectedShards) {
log.debug("-- wrong number of active slices, expected=" + expectedShards + ", found=" + collectionState.getSlices().size());
return false; return false;
for (Slice slice : collectionState) { }
for (Slice slice : slices) {
int activeReplicas = 0; int activeReplicas = 0;
for (Replica replica : slice) { for (Replica replica : slice) {
if (replica.isActive(liveNodes)) if (replica.isActive(liveNodes))
activeReplicas++; activeReplicas++;
} }
if (activeReplicas != expectedReplicas) if (activeReplicas != expectedReplicas) {
log.debug("-- wrong number of active replicas in slice " + slice.getName() + ", expected=" + expectedReplicas + ", found=" + activeReplicas);
return false; return false;
} }
}
return true; return true;
}; };
} }

View File

@ -0,0 +1,647 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.autoscaling.sim.SimCloudManager;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.LogLevel;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
/**
*
*/
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
public class IndexSizeTriggerTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static SolrCloudManager cloudManager;
private static SolrClient solrClient;
private static TimeSource timeSource;
private static SolrResourceLoader loader;
private static int SPEED;
private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
fail("Did not expect the processor to fire on first run! event=" + event);
return true;
};
private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(2);
static Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
static CountDownLatch listenerCreated = new CountDownLatch(1);
static CountDownLatch finished = new CountDownLatch(1);
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(2)
.addConfig("conf", configset("cloud-minimal"))
.configure();
if (random().nextBoolean()) {
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
solrClient = cluster.getSolrClient();
loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
SPEED = 1;
} else {
SPEED = 50;
cloudManager = SimCloudManager.createCluster(2, TimeSource.get("simTime:" + SPEED));
// wait for defaults to be applied - due to accelerated time sometimes we may miss this
cloudManager.getTimeSource().sleep(10000);
AutoScalingConfig cfg = cloudManager.getDistribStateManager().getAutoScalingConfig();
assertFalse("autoscaling config is empty", cfg.isEmpty());
solrClient = ((SimCloudManager)cloudManager).simGetSolrClient();
loader = ((SimCloudManager) cloudManager).getLoader();
}
timeSource = cloudManager.getTimeSource();
}
@After
public void restoreDefaults() throws Exception {
if (cloudManager instanceof SimCloudManager) {
log.info(((SimCloudManager) cloudManager).dumpClusterState(true));
((SimCloudManager) cloudManager).getSimClusterStateProvider().simDeleteAllCollections();
((SimCloudManager) cloudManager).simResetOpCounts();
} else {
cluster.deleteAllCollections();
}
cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), -1);
cloudManager.getTimeSource().sleep(5000);
listenerEvents.clear();
listenerCreated = new CountDownLatch(1);
finished = new CountDownLatch(1);
}
@AfterClass
public static void teardown() throws Exception {
if (cloudManager instanceof SimCloudManager) {
cloudManager.close();
}
solrClient = null;
cloudManager = null;
}
@Test
public void testTrigger() throws Exception {
String collectionName = "testTrigger_collection";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", 2, 2).setMaxShardsPerNode(2);
create.process(solrClient);
CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
CloudTestUtils.clusterShape(2, 2));
long waitForSeconds = 3 + random().nextInt(5);
Map<String, Object> props = createTriggerProps(waitForSeconds);
try (IndexSizeTrigger trigger = new IndexSizeTrigger("index_size_trigger")) {
trigger.configure(loader, cloudManager, props);
trigger.init();
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
for (int i = 0; i < 25; i++) {
SolrInputDocument doc = new SolrInputDocument("id", "id-" + i);
solrClient.add(collectionName, doc);
}
solrClient.commit(collectionName);
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
fail("processor was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
}
} else {
fail("IndexSizeTrigger was fired more than once!");
}
return true;
});
trigger.run();
TriggerEvent ev = eventRef.get();
// waitFor delay - should not produce any event yet
assertNull("waitFor not elapsed but produced an event", ev);
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
trigger.run();
ev = eventRef.get();
assertNotNull("should have fired an event", ev);
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) ev.getProperty(TriggerEvent.REQUESTED_OPS);
assertNotNull("should contain requestedOps", ops);
assertEquals("number of ops", 2, ops.size());
boolean shard1 = false;
boolean shard2 = false;
for (TriggerEvent.Op op : ops) {
assertEquals(CollectionParams.CollectionAction.SPLITSHARD, op.getAction());
Set<Pair<String, String>> hints = (Set<Pair<String, String>>)op.getHints().get(Suggester.Hint.COLL_SHARD);
assertNotNull("hints", hints);
assertEquals("hints", 1, hints.size());
Pair<String, String> p = hints.iterator().next();
assertEquals(collectionName, p.first());
if (p.second().equals("shard1")) {
shard1 = true;
} else if (p.second().equals("shard2")) {
shard2 = true;
} else {
fail("unexpected shard name " + p.second());
}
}
assertTrue("shard1 should be split", shard1);
assertTrue("shard2 should be split", shard2);
}
}
public static class CapturingTriggerListener extends TriggerListenerBase {
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) throws TriggerValidationException {
super.configure(loader, cloudManager, config);
listenerCreated.countDown();
}
@Override
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
log.info("=======> " + ev);
lst.add(ev);
}
}
public static class FinishedProcessingListener extends TriggerListenerBase {
@Override
public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
finished.countDown();
}
}
@Test
public void testSplitIntegration() throws Exception {
String collectionName = "testSplitIntegration_collection";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", 2, 2).setMaxShardsPerNode(2);
create.process(solrClient);
CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
CloudTestUtils.clusterShape(2, 2));
long waitForSeconds = 3 + random().nextInt(5);
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'index_size_trigger'," +
"'event' : 'indexSize'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'aboveDocs' : 10," +
"'belowDocs' : 4," +
"'enabled' : true," +
"'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
"{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
String setListenerCommand = "{" +
"'set-listener' : " +
"{" +
"'name' : 'capturing'," +
"'trigger' : 'index_size_trigger'," +
"'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," +
"'beforeAction' : ['compute_plan','execute_plan']," +
"'afterAction' : ['compute_plan','execute_plan']," +
"'class' : '" + CapturingTriggerListener.class.getName() + "'" +
"}" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
setListenerCommand = "{" +
"'set-listener' : " +
"{" +
"'name' : 'finished'," +
"'trigger' : 'index_size_trigger'," +
"'stage' : ['SUCCEEDED']," +
"'class' : '" + FinishedProcessingListener.class.getName() + "'" +
"}" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
for (int i = 0; i < 25; i++) {
SolrInputDocument doc = new SolrInputDocument("id", "id-" + i);
solrClient.add(collectionName, doc);
}
solrClient.commit(collectionName);
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
boolean await = finished.await(60000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("did not finish processing in time", await);
CloudTestUtils.waitForState(cloudManager, collectionName, 10, TimeUnit.SECONDS, CloudTestUtils.clusterShape(4, 2));
assertEquals(1, listenerEvents.size());
List<CapturedEvent> events = listenerEvents.get("capturing");
assertNotNull("'capturing' events not found", events);
assertEquals("events: " + events, 6, events.size());
assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage);
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage);
assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage);
assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage);
// check ops
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS);
assertNotNull("should contain requestedOps", ops);
assertEquals("number of ops", 2, ops.size());
boolean shard1 = false;
boolean shard2 = false;
for (TriggerEvent.Op op : ops) {
assertEquals(CollectionParams.CollectionAction.SPLITSHARD, op.getAction());
Set<Pair<String, String>> hints = (Set<Pair<String, String>>)op.getHints().get(Suggester.Hint.COLL_SHARD);
assertNotNull("hints", hints);
assertEquals("hints", 1, hints.size());
Pair<String, String> p = hints.iterator().next();
assertEquals(collectionName, p.first());
if (p.second().equals("shard1")) {
shard1 = true;
} else if (p.second().equals("shard2")) {
shard2 = true;
} else {
fail("unexpected shard name " + p.second());
}
}
assertTrue("shard1 should be split", shard1);
assertTrue("shard2 should be split", shard2);
}
@Test
public void testMergeIntegration() throws Exception {
String collectionName = "testMergeIntegration_collection";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", 2, 2).setMaxShardsPerNode(2);
create.process(solrClient);
CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
CloudTestUtils.clusterShape(2, 2));
for (int i = 0; i < 10; i++) {
SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100));
solrClient.add(collectionName, doc);
}
solrClient.commit(collectionName);
long waitForSeconds = 3 + random().nextInt(5);
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'index_size_trigger'," +
"'event' : 'indexSize'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'aboveDocs' : 40," +
"'belowDocs' : 4," +
"'enabled' : true," +
"'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
"{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
String setListenerCommand = "{" +
"'set-listener' : " +
"{" +
"'name' : 'capturing'," +
"'trigger' : 'index_size_trigger'," +
"'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," +
"'beforeAction' : ['compute_plan','execute_plan']," +
"'afterAction' : ['compute_plan','execute_plan']," +
"'class' : '" + CapturingTriggerListener.class.getName() + "'" +
"}" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
setListenerCommand = "{" +
"'set-listener' : " +
"{" +
"'name' : 'finished'," +
"'trigger' : 'index_size_trigger'," +
"'stage' : ['SUCCEEDED']," +
"'class' : '" + FinishedProcessingListener.class.getName() + "'" +
"}" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
// delete some docs to trigger a merge
for (int i = 0; i < 5; i++) {
solrClient.deleteById(collectionName, "id-" + (i * 100));
}
solrClient.commit(collectionName);
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
boolean await = finished.await(60000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("did not finish processing in time", await);
assertEquals(1, listenerEvents.size());
List<CapturedEvent> events = listenerEvents.get("capturing");
assertNotNull("'capturing' events not found", events);
assertEquals("events: " + events, 6, events.size());
assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage);
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage);
assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage);
assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage);
// check ops
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS);
assertNotNull("should contain requestedOps", ops);
assertTrue("number of ops: " + ops, ops.size() > 0);
for (TriggerEvent.Op op : ops) {
assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction());
Set<Pair<String, String>> hints = (Set<Pair<String, String>>)op.getHints().get(Suggester.Hint.COLL_SHARD);
assertNotNull("hints", hints);
assertEquals("hints", 2, hints.size());
Pair<String, String> p = hints.iterator().next();
assertEquals(collectionName, p.first());
}
// TODO: fix this once MERGESHARDS is supported
List<TriggerEvent.Op> unsupportedOps = (List<TriggerEvent.Op>)events.get(2).context.get("properties.unsupportedOps");
assertNotNull("should have unsupportedOps", unsupportedOps);
assertEquals(unsupportedOps.toString() + "\n" + ops, ops.size(), unsupportedOps.size());
unsupportedOps.forEach(op -> assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction()));
}
@Test
public void testMixedBounds() throws Exception {
if (cloudManager instanceof SimCloudManager) {
log.warn("Requires SOLR-12208");
return;
}
String collectionName = "testMixedBounds_collection";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", 2, 2).setMaxShardsPerNode(2);
create.process(solrClient);
CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
CloudTestUtils.clusterShape(2, 2));
for (int j = 0; j < 10; j++) {
UpdateRequest ureq = new UpdateRequest();
ureq.setParam("collection", collectionName);
for (int i = 0; i < 100; i++) {
SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100) + "-" + j);
doc.addField("foo", TestUtil.randomSimpleString(random(), 130, 130));
ureq.add(doc);
}
solrClient.request(ureq);
}
solrClient.commit(collectionName);
long waitForSeconds = 3 + random().nextInt(5);
// the trigger is initially disabled so that we have time to add listeners
// and have them capture all events once the trigger is enabled
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'index_size_trigger'," +
"'event' : 'indexSize'," +
"'waitFor' : '" + waitForSeconds + "s'," +
// don't hit this limit when indexing
"'aboveDocs' : 10000," +
// hit this limit when deleting
"'belowDocs' : 100," +
// hit this limit when indexing
"'aboveBytes' : 150000," +
// don't hit this limit when deleting
"'belowBytes' : 10," +
"'enabled' : false," +
"'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
"{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
String setListenerCommand = "{" +
"'set-listener' : " +
"{" +
"'name' : 'capturing'," +
"'trigger' : 'index_size_trigger'," +
"'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," +
"'beforeAction' : ['compute_plan','execute_plan']," +
"'afterAction' : ['compute_plan','execute_plan']," +
"'class' : '" + CapturingTriggerListener.class.getName() + "'" +
"}" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
setListenerCommand = "{" +
"'set-listener' : " +
"{" +
"'name' : 'finished'," +
"'trigger' : 'index_size_trigger'," +
"'stage' : ['SUCCEEDED']," +
"'class' : '" + FinishedProcessingListener.class.getName() + "'" +
"}" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
// now enable the trigger
String resumeTriggerCommand = "{" +
"'resume-trigger' : {" +
"'name' : 'index_size_trigger'" +
"}" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
boolean await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("did not finish processing in time", await);
assertEquals(1, listenerEvents.size());
List<CapturedEvent> events = listenerEvents.get("capturing");
assertNotNull("'capturing' events not found", events);
assertEquals("events: " + events, 6, events.size());
assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage);
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage);
assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage);
assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage);
// collection should have 2 inactive and 4 active shards
CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
CloudTestUtils.clusterShape(6, 2, true));
// check ops
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS);
assertNotNull("should contain requestedOps", ops);
assertEquals("number of ops", 2, ops.size());
boolean shard1 = false;
boolean shard2 = false;
for (TriggerEvent.Op op : ops) {
assertEquals(CollectionParams.CollectionAction.SPLITSHARD, op.getAction());
Set<Pair<String, String>> hints = (Set<Pair<String, String>>)op.getHints().get(Suggester.Hint.COLL_SHARD);
assertNotNull("hints", hints);
assertEquals("hints", 1, hints.size());
Pair<String, String> p = hints.iterator().next();
assertEquals(collectionName, p.first());
if (p.second().equals("shard1")) {
shard1 = true;
} else if (p.second().equals("shard2")) {
shard2 = true;
} else {
fail("unexpected shard name " + p.second());
}
}
assertTrue("shard1 should be split", shard1);
assertTrue("shard2 should be split", shard2);
// now delete most of docs to trigger belowDocs condition
listenerEvents.clear();
finished = new CountDownLatch(1);
// suspend the trigger first so that we can safely delete all docs
String suspendTriggerCommand = "{" +
"'suspend-trigger' : {" +
"'name' : 'index_size_trigger'" +
"}" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
for (int j = 0; j < 8; j++) {
UpdateRequest ureq = new UpdateRequest();
ureq.setParam("collection", collectionName);
for (int i = 0; i < 95; i++) {
ureq.deleteById("id-" + (i * 100) + "-" + j);
}
solrClient.request(ureq);
}
solrClient.commit(collectionName);
// resume trigger
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("did not finish processing in time", await);
assertEquals(1, listenerEvents.size());
events = listenerEvents.get("capturing");
assertNotNull("'capturing' events not found", events);
assertEquals("events: " + events, 6, events.size());
assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage);
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage);
assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage);
assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage);
// check ops
ops = (List<TriggerEvent.Op>) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS);
assertNotNull("should contain requestedOps", ops);
assertTrue("number of ops: " + ops, ops.size() > 0);
for (TriggerEvent.Op op : ops) {
assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction());
Set<Pair<String, String>> hints = (Set<Pair<String, String>>)op.getHints().get(Suggester.Hint.COLL_SHARD);
assertNotNull("hints", hints);
assertEquals("hints", 2, hints.size());
Pair<String, String> p = hints.iterator().next();
assertEquals(collectionName, p.first());
}
// TODO: fix this once MERGESHARDS is supported
List<TriggerEvent.Op> unsupportedOps = (List<TriggerEvent.Op>)events.get(2).context.get("properties.unsupportedOps");
assertNotNull("should have unsupportedOps", unsupportedOps);
assertEquals(unsupportedOps.toString() + "\n" + ops, ops.size(), unsupportedOps.size());
unsupportedOps.forEach(op -> assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction()));
}
private Map<String, Object> createTriggerProps(long waitForSeconds) {
Map<String, Object> props = new HashMap<>();
props.put("event", "indexSize");
props.put("waitFor", waitForSeconds);
props.put("enabled", true);
props.put(IndexSizeTrigger.ABOVE_DOCS_PROP, 10);
props.put(IndexSizeTrigger.BELOW_DOCS_PROP, 2);
List<Map<String, String>> actions = new ArrayList<>(3);
Map<String, String> map = new HashMap<>(2);
map.put("name", "compute_plan");
map.put("class", "solr.ComputePlanAction");
actions.add(map);
map = new HashMap<>(2);
map.put("name", "execute_plan");
map.put("class", "solr.ExecutePlanAction");
actions.add(map);
props.put("actions", actions);
return props;
}
}

View File

@ -91,7 +91,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
long eventTimeNanos = event.getEventTime(); long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS; long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) { if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
fail("NodeAddedListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos); fail("processor was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
} }
} else { } else {
fail("NodeAddedTrigger was fired more than once!"); fail("NodeAddedTrigger was fired more than once!");

View File

@ -165,7 +165,7 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
.setShardName("shard1"); .setShardName("shard1");
split1.process(solrClient); split1.process(solrClient);
CloudTestUtils.waitForState(cloudManager, "failed to split " + collection1, collection1, CloudTestUtils.waitForState(cloudManager, "failed to split " + collection1, collection1,
CloudTestUtils.clusterShape(3, 1)); CloudTestUtils.clusterShape(3, 1, true));
String setListenerCommand = "{" + String setListenerCommand = "{" +
"'set-listener' : " + "'set-listener' : " +

View File

@ -24,8 +24,10 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
@ -42,8 +44,11 @@ import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
import org.apache.solr.client.solrj.cloud.DistribStateManager; import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.NodeStateProvider; import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.impl.ClusterStateProvider; import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.RequestWriter; import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.client.solrj.response.RequestStatusState;
@ -55,6 +60,7 @@ import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.rule.ImplicitSnitch; import org.apache.solr.common.cloud.rule.ImplicitSnitch;
@ -240,6 +246,67 @@ public class SimCloudManager implements SolrCloudManager {
return values; return values;
} }
public String dumpClusterState(boolean withCollections) throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("#######################################\n");
sb.append("############ CLUSTER STATE ############\n");
sb.append("#######################################\n");
sb.append("## Live nodes:\t\t" + getLiveNodesSet().size() + "\n");
int emptyNodes = 0;
int maxReplicas = 0;
int minReplicas = Integer.MAX_VALUE;
Map<String, Map<Replica.State, AtomicInteger>> replicaStates = new TreeMap<>();
int numReplicas = 0;
for (String node : getLiveNodesSet().get()) {
List<ReplicaInfo> replicas = getSimClusterStateProvider().simGetReplicaInfos(node);
numReplicas += replicas.size();
if (replicas.size() > maxReplicas) {
maxReplicas = replicas.size();
}
if (minReplicas > replicas.size()) {
minReplicas = replicas.size();
}
for (ReplicaInfo ri : replicas) {
replicaStates.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>())
.computeIfAbsent(ri.getState(), s -> new AtomicInteger())
.incrementAndGet();
}
if (replicas.isEmpty()) {
emptyNodes++;
}
}
if (minReplicas == Integer.MAX_VALUE) {
minReplicas = 0;
}
sb.append("## Empty nodes:\t" + emptyNodes + "\n");
Set<String> deadNodes = getSimNodeStateProvider().simGetDeadNodes();
sb.append("## Dead nodes:\t\t" + deadNodes.size() + "\n");
deadNodes.forEach(n -> sb.append("##\t\t" + n + "\n"));
sb.append("## Collections:\t" + getSimClusterStateProvider().simListCollections() + "\n");
if (withCollections) {
ClusterState state = clusterStateProvider.getClusterState();
state.forEachCollection(coll -> sb.append(coll.toString() + "\n"));
}
sb.append("## Max replicas per node:\t" + maxReplicas + "\n");
sb.append("## Min replicas per node:\t" + minReplicas + "\n");
sb.append("## Total replicas:\t\t" + numReplicas + "\n");
replicaStates.forEach((c, map) -> {
AtomicInteger repCnt = new AtomicInteger();
map.forEach((s, cnt) -> repCnt.addAndGet(cnt.get()));
sb.append("## * " + c + "\t\t" + repCnt.get() + "\n");
map.forEach((s, cnt) -> sb.append("##\t\t- " + String.format(Locale.ROOT, "%-12s %4d", s, cnt.get()) + "\n"));
});
sb.append("######### Solr op counts ##########\n");
simGetOpCounts().forEach((k, cnt) -> sb.append("##\t\t- " + String.format(Locale.ROOT, "%-14s %4d", k, cnt.get()) + "\n"));
sb.append("######### Autoscaling event counts ###########\n");
Map<String, Map<String, AtomicInteger>> counts = simGetEventCounts();
counts.forEach((trigger, map) -> {
sb.append("## * Trigger: " + trigger + "\n");
map.forEach((s, cnt) -> sb.append("##\t\t- " + String.format(Locale.ROOT, "%-11s %4d", s, cnt.get()) + "\n"));
});
return sb.toString();
}
/** /**
* Get the instance of {@link SolrResourceLoader} that is used by the cluster components. * Get the instance of {@link SolrResourceLoader} that is used by the cluster components.
*/ */
@ -333,6 +400,17 @@ public class SimCloudManager implements SolrCloudManager {
return new SolrClient() { return new SolrClient() {
@Override @Override
public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException { public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
if (collection != null) {
if (request instanceof AbstractUpdateRequest) {
((AbstractUpdateRequest)request).setParam("collection", collection);
} else if (request instanceof QueryRequest) {
ModifiableSolrParams params = new ModifiableSolrParams(request.getParams());
params.set("collection", collection);
request = new QueryRequest(params);
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "when collection != null only UpdateRequest and QueryRequest are supported: request=" + request + ", collection=" + collection);
}
}
SolrResponse rsp = SimCloudManager.this.request(request); SolrResponse rsp = SimCloudManager.this.request(request);
return rsp.getResponse(); return rsp.getResponse();
} }
@ -508,15 +586,18 @@ public class SimCloudManager implements SolrCloudManager {
incrementCount("update"); incrementCount("update");
// support only updates to the system collection // support only updates to the system collection
UpdateRequest ureq = (UpdateRequest)req; UpdateRequest ureq = (UpdateRequest)req;
if (ureq.getCollection() == null || !ureq.getCollection().equals(CollectionAdminParams.SYSTEM_COLL)) { String collection = ureq.getCollection();
throw new UnsupportedOperationException("Only .system updates are supported but got: " + req); if (collection != null && !collection.equals(CollectionAdminParams.SYSTEM_COLL)) {
} // simulate an update
return clusterStateProvider.simUpdate(ureq);
} else {
List<SolrInputDocument> docs = ureq.getDocuments(); List<SolrInputDocument> docs = ureq.getDocuments();
if (docs != null) { if (docs != null) {
systemColl.addAll(docs); systemColl.addAll(docs);
} }
return new UpdateResponse(); return new UpdateResponse();
} }
}
// support only a specific subset of collection admin ops // support only a specific subset of collection admin ops
if (!(req instanceof CollectionAdminRequest)) { if (!(req instanceof CollectionAdminRequest)) {
throw new UnsupportedOperationException("Only some CollectionAdminRequest-s are supported: " + req.getClass().getName()); throw new UnsupportedOperationException("Only some CollectionAdminRequest-s are supported: " + req.getClass().getName());
@ -560,8 +641,12 @@ public class SimCloudManager implements SolrCloudManager {
} }
break; break;
case DELETE: case DELETE:
try {
clusterStateProvider.simDeleteCollection(req.getParams().get(CommonParams.NAME), clusterStateProvider.simDeleteCollection(req.getParams().get(CommonParams.NAME),
req.getParams().get(CommonAdminParams.ASYNC), results); req.getParams().get(CommonAdminParams.ASYNC), results);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
break; break;
case LIST: case LIST:
results.add("collections", clusterStateProvider.simListCollections()); results.add("collections", clusterStateProvider.simListCollections());

View File

@ -47,6 +47,8 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData; import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.client.solrj.impl.ClusterStateProvider; import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.ActionThrottle; import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.api.collections.AddReplicaCmd; import org.apache.solr.cloud.api.collections.AddReplicaCmd;
import org.apache.solr.cloud.api.collections.Assign; import org.apache.solr.cloud.api.collections.Assign;
@ -57,6 +59,7 @@ import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.CollectionMutator; import org.apache.solr.cloud.overseer.CollectionMutator;
import org.apache.solr.cloud.overseer.ZkWriteCommand; import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.DocRouter;
@ -241,7 +244,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @return true if a node existed and was removed * @return true if a node existed and was removed
*/ */
public boolean simRemoveNode(String nodeId) throws Exception { public boolean simRemoveNode(String nodeId) throws Exception {
lock.lock(); lock.lockInterruptibly();
try { try {
Set<String> collections = new HashSet<>(); Set<String> collections = new HashSet<>();
// mark every replica on that node as down // mark every replica on that node as down
@ -296,14 +299,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
liveNodes.add(nodeId); liveNodes.add(nodeId);
createEphemeralLiveNode(nodeId); createEphemeralLiveNode(nodeId);
Set<String> collections = new HashSet<>(); Set<String> collections = new HashSet<>();
lock.lock(); lock.lockInterruptibly();
try { try {
setReplicaStates(nodeId, Replica.State.RECOVERING, collections); setReplicaStates(nodeId, Replica.State.RECOVERING, collections);
} finally { } finally {
lock.unlock(); lock.unlock();
} }
cloudManager.getTimeSource().sleep(1000); cloudManager.getTimeSource().sleep(1000);
lock.lock(); lock.lockInterruptibly();
try { try {
setReplicaStates(nodeId, Replica.State.ACTIVE, collections); setReplicaStates(nodeId, Replica.State.ACTIVE, collections);
} finally { } finally {
@ -389,7 +392,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
throw new Exception("Wrong node (not " + nodeId + "): " + replicaInfo); throw new Exception("Wrong node (not " + nodeId + "): " + replicaInfo);
} }
lock.lock(); lock.lockInterruptibly();
try { try {
opDelay(replicaInfo.getCollection(), CollectionParams.CollectionAction.ADDREPLICA.name()); opDelay(replicaInfo.getCollection(), CollectionParams.CollectionAction.ADDREPLICA.name());
@ -435,7 +438,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
*/ */
public void simRemoveReplica(String nodeId, String coreNodeName) throws Exception { public void simRemoveReplica(String nodeId, String coreNodeName) throws Exception {
List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent(nodeId, n -> new ArrayList<>()); List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent(nodeId, n -> new ArrayList<>());
lock.lock(); lock.lockInterruptibly();
try { try {
for (int i = 0; i < replicas.size(); i++) { for (int i = 0; i < replicas.size(); i++) {
if (coreNodeName.equals(replicas.get(i).getName())) { if (coreNodeName.equals(replicas.get(i).getName())) {
@ -638,6 +641,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
replicaNum.getAndIncrement()); replicaNum.getAndIncrement());
try { try {
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName); replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
replicaProps.put("SEARCHER.searcher.numDocs", 0);
replicaProps.put("SEARCHER.searcher.maxDoc", 0);
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0), ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps); coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
cloudManager.submit(() -> { cloudManager.submit(() -> {
@ -662,6 +668,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
} }
}); });
}); });
// force recreation of collection states
collectionsStatesRef.set(null);
simRunLeaderElection(Collections.singleton(collectionName), true); simRunLeaderElection(Collections.singleton(collectionName), true);
if (waitForFinalState) { if (waitForFinalState) {
boolean finished = finalStateLatch.await(cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, 60, TimeUnit.MILLISECONDS), boolean finished = finalStateLatch.await(cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, 60, TimeUnit.MILLISECONDS),
@ -680,11 +688,11 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param async async id * @param async async id
* @param results results of the operation * @param results results of the operation
*/ */
public void simDeleteCollection(String collection, String async, NamedList results) throws IOException { public void simDeleteCollection(String collection, String async, NamedList results) throws Exception {
if (async != null) { if (async != null) {
results.add(CoreAdminParams.REQUESTID, async); results.add(CoreAdminParams.REQUESTID, async);
} }
lock.lock(); lock.lockInterruptibly();
try { try {
collProperties.remove(collection); collProperties.remove(collection);
sliceProperties.remove(collection); sliceProperties.remove(collection);
@ -722,7 +730,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* Remove all collections. * Remove all collections.
*/ */
public void simDeleteAllCollections() throws Exception { public void simDeleteAllCollections() throws Exception {
lock.lock(); lock.lockInterruptibly();
try { try {
nodeReplicaMap.clear(); nodeReplicaMap.clear();
collProperties.clear(); collProperties.clear();
@ -797,7 +805,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
String collectionName = message.getStr(COLLECTION_PROP); String collectionName = message.getStr(COLLECTION_PROP);
String sliceName = message.getStr(SHARD_ID_PROP); String sliceName = message.getStr(SHARD_ID_PROP);
ClusterState clusterState = getClusterState(); ClusterState clusterState = getClusterState();
lock.lock(); lock.lockInterruptibly();
try { try {
ZkWriteCommand cmd = new CollectionMutator(cloudManager).createShard(clusterState, message); ZkWriteCommand cmd = new CollectionMutator(cloudManager).createShard(clusterState, message);
if (cmd.noop) { if (cmd.noop) {
@ -865,6 +873,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
AtomicReference<String> sliceName = new AtomicReference<>(); AtomicReference<String> sliceName = new AtomicReference<>();
sliceName.set(message.getStr(SHARD_ID_PROP)); sliceName.set(message.getStr(SHARD_ID_PROP));
String splitKey = message.getStr("split.key"); String splitKey = message.getStr("split.key");
// always invalidate cached collection states to get up-to-date metrics
collectionsStatesRef.set(null);
ClusterState clusterState = getClusterState(); ClusterState clusterState = getClusterState();
DocCollection collection = clusterState.getCollection(collectionName); DocCollection collection = clusterState.getCollection(collectionName);
Slice parentSlice = SplitShardCmd.getParentSlice(clusterState, collectionName, sliceName, splitKey); Slice parentSlice = SplitShardCmd.getParentSlice(clusterState, collectionName, sliceName, splitKey);
@ -887,6 +899,18 @@ public class SimClusterStateProvider implements ClusterStateProvider {
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getLastSessionWrapper(true); PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
if (sessionWrapper != null) sessionWrapper.release(); if (sessionWrapper != null) sessionWrapper.release();
// adjust numDocs / deletedDocs / maxDoc
Replica leader = parentSlice.getLeader();
// XXX leader election may not have happened yet - should we require it?
if (leader == null) {
leader = parentSlice.getReplicas().iterator().next();
}
String numDocsStr = leader.getStr("SEARCHER.searcher.numDocs", "0");
long numDocs = Long.parseLong(numDocsStr);
long newNumDocs = numDocs / subSlices.size();
long remainder = numDocs % subSlices.size();
String remainderSlice = null;
for (ReplicaPosition replicaPosition : replicaPositions) { for (ReplicaPosition replicaPosition : replicaPositions) {
String subSliceName = replicaPosition.shard; String subSliceName = replicaPosition.shard;
String subShardNodeName = replicaPosition.node; String subShardNodeName = replicaPosition.node;
@ -897,15 +921,32 @@ public class SimClusterStateProvider implements ClusterStateProvider {
replicaProps.put(ZkStateReader.REPLICA_TYPE, replicaPosition.type.toString()); replicaProps.put(ZkStateReader.REPLICA_TYPE, replicaPosition.type.toString());
replicaProps.put(ZkStateReader.BASE_URL_PROP, Utils.getBaseUrlForNodeName(subShardNodeName, "http")); replicaProps.put(ZkStateReader.BASE_URL_PROP, Utils.getBaseUrlForNodeName(subShardNodeName, "http"));
long replicasNumDocs = newNumDocs;
if (remainderSlice == null) {
remainderSlice = subSliceName;
}
if (remainderSlice.equals(subSliceName)) { // only add to one sub slice
replicasNumDocs += remainder;
}
replicaProps.put("SEARCHER.searcher.numDocs", replicasNumDocs);
replicaProps.put("SEARCHER.searcher.maxDoc", replicasNumDocs);
replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0), ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps); solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
simAddReplica(replicaPosition.node, ri, false); simAddReplica(replicaPosition.node, ri, false);
} }
// mark the old slice as inactive // mark the old slice as inactive
lock.lockInterruptibly();
try {
Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()) Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
.computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>()); .computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>());
props.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString()); props.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString());
props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs())); props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
// XXX also mark replicas as down? currently SplitShardCmd doesn't do this
} finally {
lock.unlock();
}
// add slice props // add slice props
for (int i = 0; i < subRanges.size(); i++) { for (int i = 0; i < subRanges.size(); i++) {
String subSlice = subSlices.get(i); String subSlice = subSlices.get(i);
@ -915,8 +956,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
sliceProps.put(Slice.RANGE, range); sliceProps.put(Slice.RANGE, range);
sliceProps.put(Slice.PARENT, sliceName.get()); sliceProps.put(Slice.PARENT, sliceName.get());
sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.ACTIVE.toString()); sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.ACTIVE.toString());
props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs())); sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
} }
collectionsStatesRef.set(null);
simRunLeaderElection(Collections.singleton(collectionName), true); simRunLeaderElection(Collections.singleton(collectionName), true);
results.add("success", ""); results.add("success", "");
@ -945,7 +987,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
opDelay(collectionName, CollectionParams.CollectionAction.DELETESHARD.name()); opDelay(collectionName, CollectionParams.CollectionAction.DELETESHARD.name());
lock.lock(); lock.lockInterruptibly();
try { try {
sliceProperties.computeIfAbsent(collectionName, coll -> new ConcurrentHashMap<>()).remove(sliceName); sliceProperties.computeIfAbsent(collectionName, coll -> new ConcurrentHashMap<>()).remove(sliceName);
nodeReplicaMap.forEach((n, replicas) -> { nodeReplicaMap.forEach((n, replicas) -> {
@ -966,6 +1008,122 @@ public class SimClusterStateProvider implements ClusterStateProvider {
} }
} }
/**
* Simulate an update by modifying replica metrics.
* The following core metrics are updated:
* <ul>
* <li><code>SEARCHER.searcher.numDocs</code> - increased by added docs, decreased by deleteById and deleteByQuery</li>
* <li><code>SEARCHER.searcher.deletedDocs</code> - decreased by deleteById and deleteByQuery by up to <code>numDocs</code></li>
* <li><code>SEARCHER.searcher.maxDoc</code> - always increased by the number of added docs.</li>
* </ul>
* <p>IMPORTANT limitations:</p>
* <ul>
* <li>document replacements are always counted as new docs</li>
* <li>delete by ID always succeeds (unless numDocs == 0)</li>
* <li>deleteByQuery is not supported unless the query is <code>*:*</code></li>
* </ul>
* @param req update request. This request MUST have the <code>collection</code> param set.
* @return {@link UpdateResponse}
* @throws SolrException on errors, such as nonexistent collection or unsupported deleteByQuery
*/
public UpdateResponse simUpdate(UpdateRequest req) throws SolrException, InterruptedException, IOException {
String collection = req.getCollection();
if (collection == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not set");
}
if (!simListCollections().contains(collection)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collection + "' doesn't exist");
}
// always reset first to get the current metrics - it's easier than to keep matching
// Replica with ReplicaInfo where the current real counts are stored
collectionsStatesRef.set(null);
DocCollection coll = getClusterState().getCollection(collection);
DocRouter router = coll.getRouter();
boolean modified = false;
lock.lockInterruptibly();
try {
List<String> deletes = req.getDeleteById();
if (deletes != null && !deletes.isEmpty()) {
for (String id : deletes) {
Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
// NOTE: we don't use getProperty because it uses PROPERTY_PROP_PREFIX
String numDocsStr = s.getLeader().getStr("SEARCHER.searcher.numDocs");
if (numDocsStr == null) {
LOG.debug("-- no docs in " + s.getLeader());
continue;
}
long numDocs = Long.parseLong(numDocsStr);
if (numDocs == 0) {
LOG.debug("-- attempting to delete nonexistent doc " + id + " from " + s.getLeader());
continue;
}
if (numDocsStr != null) {
modified = true;
try {
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", 1, true, false);
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", -1, true, false);
} catch (Exception e) {
throw new IOException(e);
}
}
}
}
deletes = req.getDeleteQuery();
if (deletes != null && !deletes.isEmpty()) {
for (String q : deletes) {
if (!"*:*".equals(q)) {
throw new UnsupportedOperationException("Only '*:*' query is supported in deleteByQuery");
}
for (Slice s : coll.getSlices()) {
String numDocsStr = s.getLeader().getStr("SEARCHER.searcher.numDocs");
if (numDocsStr == null) {
continue;
}
long numDocs = Long.parseLong(numDocsStr);
if (numDocs == 0) {
continue;
}
modified = true;
try {
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", numDocs, false, false);
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", 0, false, false);
} catch (Exception e) {
throw new IOException(e);
}
}
}
}
List<SolrInputDocument> docs = req.getDocuments();
if (docs != null && !docs.isEmpty()) {
for (SolrInputDocument doc : docs) {
String id = (String) doc.getFieldValue("id");
if (id == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
}
Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
modified = true;
try {
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", 1, true, false);
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.maxDoc", 1, true, false);
// Policy reuses this value and expects it to be in GB units!!!
// the idea here is to increase the index size by 500 bytes with each doc
// simSetShardValue(collection, s.getName(), "INDEX.sizeInBytes", 500, true, false);
} catch (Exception e) {
throw new IOException(e);
}
}
}
if (modified) {
collectionsStatesRef.set(null);
}
} finally {
lock.unlock();
}
return new UpdateResponse();
}
/** /**
* Saves cluster properties to clusterprops.json. * Saves cluster properties to clusterprops.json.
* @return current properties * @return current properties
@ -988,7 +1146,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param properties properties to set * @param properties properties to set
*/ */
public void simSetClusterProperties(Map<String, Object> properties) throws Exception { public void simSetClusterProperties(Map<String, Object> properties) throws Exception {
lock.lock(); lock.lockInterruptibly();
try { try {
clusterProperties.clear(); clusterProperties.clear();
if (properties != null) { if (properties != null) {
@ -1007,7 +1165,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param value property value * @param value property value
*/ */
public void simSetClusterProperty(String key, Object value) throws Exception { public void simSetClusterProperty(String key, Object value) throws Exception {
lock.lock(); lock.lockInterruptibly();
try { try {
if (value != null) { if (value != null) {
clusterProperties.put(key, value); clusterProperties.put(key, value);
@ -1026,7 +1184,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param properties properties * @param properties properties
*/ */
public void simSetCollectionProperties(String coll, Map<String, Object> properties) throws Exception { public void simSetCollectionProperties(String coll, Map<String, Object> properties) throws Exception {
lock.lock(); lock.lockInterruptibly();
try { try {
if (properties == null) { if (properties == null) {
collProperties.remove(coll); collProperties.remove(coll);
@ -1049,7 +1207,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
*/ */
public void simSetCollectionProperty(String coll, String key, String value) throws Exception { public void simSetCollectionProperty(String coll, String key, String value) throws Exception {
Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>()); Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
lock.lock(); lock.lockInterruptibly();
try { try {
if (value == null) { if (value == null) {
props.remove(key); props.remove(key);
@ -1070,7 +1228,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
*/ */
public void simSetSliceProperties(String coll, String slice, Map<String, Object> properties) throws Exception { public void simSetSliceProperties(String coll, String slice, Map<String, Object> properties) throws Exception {
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll, c -> new HashMap<>()).computeIfAbsent(slice, s -> new HashMap<>()); Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll, c -> new HashMap<>()).computeIfAbsent(slice, s -> new HashMap<>());
lock.lock(); lock.lockInterruptibly();
try { try {
sliceProps.clear(); sliceProps.clear();
if (properties != null) { if (properties != null) {
@ -1089,7 +1247,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param value property value * @param value property value
*/ */
public void simSetCollectionValue(String collection, String key, Object value) throws Exception { public void simSetCollectionValue(String collection, String key, Object value) throws Exception {
simSetCollectionValue(collection, key, value, false); simSetCollectionValue(collection, key, value, false, false);
} }
/** /**
@ -1100,8 +1258,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param divide if the value is a {@link Number} and this param is true, then the value will be evenly * @param divide if the value is a {@link Number} and this param is true, then the value will be evenly
* divided by the number of replicas. * divided by the number of replicas.
*/ */
public void simSetCollectionValue(String collection, String key, Object value, boolean divide) throws Exception { public void simSetCollectionValue(String collection, String key, Object value, boolean delta, boolean divide) throws Exception {
simSetShardValue(collection, null, key, value, divide); simSetShardValue(collection, null, key, value, delta, divide);
} }
/** /**
@ -1112,7 +1270,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param value property value * @param value property value
*/ */
public void simSetShardValue(String collection, String shard, String key, Object value) throws Exception { public void simSetShardValue(String collection, String shard, String key, Object value) throws Exception {
simSetShardValue(collection, shard, key, value, false); simSetShardValue(collection, shard, key, value, false, false);
} }
/** /**
@ -1121,10 +1279,12 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param shard shard name. If null then all shards will be affected. * @param shard shard name. If null then all shards will be affected.
* @param key property name * @param key property name
* @param value property value * @param value property value
* @param delta if true then treat the numeric value as delta to add to the existing value
* (or set the value to delta if missing)
* @param divide if the value is a {@link Number} and this is true, then the value will be evenly * @param divide if the value is a {@link Number} and this is true, then the value will be evenly
* divided by the number of replicas. * divided by the number of replicas.
*/ */
public void simSetShardValue(String collection, String shard, String key, Object value, boolean divide) throws Exception { public void simSetShardValue(String collection, String shard, String key, Object value, boolean delta, boolean divide) throws Exception {
List<ReplicaInfo> infos = new ArrayList<>(); List<ReplicaInfo> infos = new ArrayList<>();
nodeReplicaMap.forEach((n, replicas) -> { nodeReplicaMap.forEach((n, replicas) -> {
replicas.forEach(r -> { replicas.forEach(r -> {
@ -1140,15 +1300,39 @@ public class SimClusterStateProvider implements ClusterStateProvider {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection " + collection + " doesn't exist."); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection " + collection + " doesn't exist.");
} }
if (divide && value != null && (value instanceof Number)) { if (divide && value != null && (value instanceof Number)) {
if ((value instanceof Long) || (value instanceof Integer)) {
value = ((Number) value).longValue() / infos.size();
} else {
value = ((Number) value).doubleValue() / infos.size(); value = ((Number) value).doubleValue() / infos.size();
} }
}
for (ReplicaInfo r : infos) { for (ReplicaInfo r : infos) {
synchronized (r) { synchronized (r) {
if (value == null) { if (value == null) {
r.getVariables().remove(key); r.getVariables().remove(key);
} else {
if (delta) {
Object prevValue = r.getVariables().get(key);
if (prevValue != null) {
if ((prevValue instanceof Number) && (value instanceof Number)) {
if (((prevValue instanceof Long) || (prevValue instanceof Integer)) &&
((value instanceof Long) || (value instanceof Integer))) {
Long newValue = ((Number)prevValue).longValue() + ((Number)value).longValue();
r.getVariables().put(key, newValue);
} else {
Double newValue = ((Number)prevValue).doubleValue() + ((Number)value).doubleValue();
r.getVariables().put(key, newValue);
}
} else {
throw new UnsupportedOperationException("delta cannot be applied to non-numeric values: " + prevValue + " and " + value);
}
} else { } else {
r.getVariables().put(key, value); r.getVariables().put(key, value);
} }
} else {
r.getVariables().put(key, value);
}
}
} }
} }
} }
@ -1172,9 +1356,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* List collections. * List collections.
* @return list of existing collections. * @return list of existing collections.
*/ */
public List<String> simListCollections() { public List<String> simListCollections() throws InterruptedException {
final Set<String> collections = new HashSet<>(); final Set<String> collections = new HashSet<>();
lock.lock(); lock.lockInterruptibly();
try { try {
nodeReplicaMap.forEach((n, replicas) -> { nodeReplicaMap.forEach((n, replicas) -> {
replicas.forEach(ri -> collections.add(ri.getCollection())); replicas.forEach(ri -> collections.add(ri.getCollection()));
@ -1216,6 +1400,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
return state; return state;
} }
// this method uses a simple cache in collectionsStatesRef. Operations that modify
// cluster state should always reset this cache so that the changes become visible
private Map<String, DocCollection> getCollectionStates() { private Map<String, DocCollection> getCollectionStates() {
Map<String, DocCollection> collectionStates = collectionsStatesRef.get(); Map<String, DocCollection> collectionStates = collectionsStatesRef.get();
if (collectionStates != null) { if (collectionStates != null) {
@ -1263,7 +1449,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
slices.put(s, slice); slices.put(s, slice);
}); });
Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>()); Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>());
DocCollection dc = new DocCollection(coll, slices, collProps, DocRouter.DEFAULT, clusterStateVersion, ZkStateReader.CLUSTER_STATE); Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion, ZkStateReader.CLUSTER_STATE);
res.put(coll, dc); res.put(coll, dc);
}); });
collectionsStatesRef.set(res); collectionsStatesRef.set(res);

View File

@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
/** /**
* Simulated {@link NodeStateProvider}. * Simulated {@link NodeStateProvider}.
* Note: in order to setup node-level metrics use {@link #simSetNodeValues(String, Map)}. However, in order * Note: in order to setup node-level metrics use {@link #simSetNodeValues(String, Map)}. However, in order
* to setup core-level metrics use {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean)}. * to setup core-level metrics use {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean, boolean)}.
*/ */
public class SimNodeStateProvider implements NodeStateProvider { public class SimNodeStateProvider implements NodeStateProvider {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -204,7 +204,7 @@ public class SimNodeStateProvider implements NodeStateProvider {
/** /**
* Simulate getting replica metrics values. This uses per-replica properties set in * Simulate getting replica metrics values. This uses per-replica properties set in
* {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean)} and * {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean, boolean)} and
* similar methods. * similar methods.
* @param node node id * @param node node id
* @param tags metrics names * @param tags metrics names

View File

@ -22,15 +22,9 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
@ -79,59 +73,7 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
public void tearDown() throws Exception { public void tearDown() throws Exception {
super.tearDown(); super.tearDown();
if (cluster != null) { if (cluster != null) {
log.info("\n"); log.info(cluster.dumpClusterState(false));
log.info("#############################################");
log.info("############ FINAL CLUSTER STATS ############");
log.info("#############################################\n");
log.info("## Live nodes:\t\t" + cluster.getLiveNodesSet().size());
int emptyNodes = 0;
int maxReplicas = 0;
int minReplicas = Integer.MAX_VALUE;
Map<String, Map<Replica.State, AtomicInteger>> replicaStates = new TreeMap<>();
int numReplicas = 0;
for (String node : cluster.getLiveNodesSet().get()) {
List<ReplicaInfo> replicas = cluster.getSimClusterStateProvider().simGetReplicaInfos(node);
numReplicas += replicas.size();
if (replicas.size() > maxReplicas) {
maxReplicas = replicas.size();
}
if (minReplicas > replicas.size()) {
minReplicas = replicas.size();
}
for (ReplicaInfo ri : replicas) {
replicaStates.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>())
.computeIfAbsent(ri.getState(), s -> new AtomicInteger())
.incrementAndGet();
}
if (replicas.isEmpty()) {
emptyNodes++;
}
}
if (minReplicas == Integer.MAX_VALUE) {
minReplicas = 0;
}
log.info("## Empty nodes:\t" + emptyNodes);
Set<String> deadNodes = cluster.getSimNodeStateProvider().simGetDeadNodes();
log.info("## Dead nodes:\t\t" + deadNodes.size());
deadNodes.forEach(n -> log.info("##\t\t" + n));
log.info("## Collections:\t" + cluster.getSimClusterStateProvider().simListCollections());
log.info("## Max replicas per node:\t" + maxReplicas);
log.info("## Min replicas per node:\t" + minReplicas);
log.info("## Total replicas:\t\t" + numReplicas);
replicaStates.forEach((c, map) -> {
AtomicInteger repCnt = new AtomicInteger();
map.forEach((s, cnt) -> repCnt.addAndGet(cnt.get()));
log.info("## * " + c + "\t\t" + repCnt.get());
map.forEach((s, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-12s %4d", s, cnt.get())));
});
log.info("######### Final Solr op counts ##########");
cluster.simGetOpCounts().forEach((k, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-14s %4d", k, cnt.get())));
log.info("######### Autoscaling event counts ###########");
Map<String, Map<String, AtomicInteger>> counts = cluster.simGetEventCounts();
counts.forEach((trigger, map) -> {
log.info("## * Trigger: " + trigger);
map.forEach((s, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-11s %4d", s, cnt.get())));
});
} }
} }

View File

@ -540,7 +540,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
String metricName = "QUERY./select.requestTimes:1minRate"; String metricName = "QUERY./select.requestTimes:1minRate";
// simulate search traffic // simulate search traffic
cluster.getSimClusterStateProvider().simSetShardValue(collectionName, "shard1", metricName, 40, true); cluster.getSimClusterStateProvider().simSetShardValue(collectionName, "shard1", metricName, 40, false, true);
// now define the trigger. doing it earlier may cause partial events to be generated (where only some // now define the trigger. doing it earlier may cause partial events to be generated (where only some
// nodes / replicas exceeded the threshold). // nodes / replicas exceeded the threshold).
@ -592,7 +592,19 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
ops.forEach(op -> { ops.forEach(op -> {
assertEquals(CollectionParams.CollectionAction.ADDREPLICA, op.getAction()); assertEquals(CollectionParams.CollectionAction.ADDREPLICA, op.getAction());
assertEquals(1, op.getHints().size()); assertEquals(1, op.getHints().size());
Pair<String, String> hint = (Pair<String, String>)op.getHints().get(Suggester.Hint.COLL_SHARD); Object o = op.getHints().get(Suggester.Hint.COLL_SHARD);
// this may be a pair or a HashSet of pairs with size 1
Pair<String, String> hint = null;
if (o instanceof Pair) {
hint = (Pair<String, String>)o;
} else if (o instanceof Set) {
assertEquals("unexpected number of hints: " + o, 1, ((Set)o).size());
o = ((Set)o).iterator().next();
assertTrue("unexpected hint: " + o, o instanceof Pair);
hint = (Pair<String, String>)o;
} else {
fail("unexpected hints: " + o);
}
assertNotNull(hint); assertNotNull(hint);
assertEquals(collectionName, hint.first()); assertEquals(collectionName, hint.first());
assertEquals("shard1", hint.second()); assertEquals("shard1", hint.second());

View File

@ -1192,7 +1192,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
// solrClient.query(COLL1, query); // solrClient.query(COLL1, query);
// } // }
cluster.getSimClusterStateProvider().simSetCollectionValue(COLL1, "QUERY./select.requestTimes:1minRate", 500, true); cluster.getSimClusterStateProvider().simSetCollectionValue(COLL1, "QUERY./select.requestTimes:1minRate", 500, false, true);
boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS); boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("The trigger did not fire at all", await); assertTrue("The trigger did not fire at all", await);

View File

@ -34,6 +34,8 @@ Currently the following event types (and corresponding trigger implementations)
* `nodeAdded`: generated when a new node joins the cluster * `nodeAdded`: generated when a new node joins the cluster
* `nodeLost`: generated when a node leaves the cluster * `nodeLost`: generated when a node leaves the cluster
* `metric`: generated when the configured metric crosses a configured lower or upper threshold value * `metric`: generated when the configured metric crosses a configured lower or upper threshold value
* `indexSize`: generated when a shard size (defined as index size in bytes or number of documents)
exceeds upper or lower threshold values
* `searchRate`: generated when the 1-minute average search rate exceeds configured upper threshold * `searchRate`: generated when the 1-minute average search rate exceeds configured upper threshold
* `scheduled`: generated according to a scheduled time period such as every 24 hours etc * `scheduled`: generated according to a scheduled time period such as every 24 hours etc
@ -105,6 +107,81 @@ This trigger supports the following configuration:
} }
---- ----
== Index Size Trigger
This trigger can be used for monitoring the size of collection shards, measured either by the
number of documents in a shard or the physical size of the shard's index in bytes.
When either of the upper thresholds is exceeded the trigger will generate an event with
a (configurable) requested operation to perform on the offending shards - by default
this is a SPLITSHARD operation.
Similarly, when either of the lower thresholds is exceeded the trigger will generate an
event with a (configurable) requested operation to perform on two of the smallest
shards - by default this is a MERGESHARDS operation (which is currently ignored because
it's not yet implemented - SOLR-9407)
Additionally, monitoring can be restricted to a list of collections - by default
all collections are monitored.
This trigger supports the following configuration parameters (all thresholds are exclusive):
`aboveBytes`:: upper threshold in bytes. This value is compared to the `INDEX.sizeInBytes` metric.
`belowBytes`:: lower threshold in bytes. Note that this value should be at least 2x smaller than
`aboveBytes`
`aboveDocs`:: upper threshold expressed as the number of documents. This value is compared with `SEARCHER.searcher.numDocs` metric.
Note: due to the way Lucene indexes work a shard may exceed the `aboveBytes` threshold
even if the number of documents is relatively small, because replaced and deleted documents keep
occupying disk space until they are actually removed during Lucene index merging.
`belowDocs`:: lower threshold expressed as the number of documents.
`aboveOp`:: operation to request when an upper threshold is exceeded. If not specified the
default value is `SPLITSHARD`.
`belowOp`:: operation to request when a lower threshold is exceeded. If not specified
the default value is `MERGESHARDS` (but see the note above).
`collections`:: comma-separated list of collection names that this trigger should monitor. If not
specified or empty all collections are monitored.
Events generated by this trigger contain additional details about the shards
that exceeded thresholds and the types of violations (upper / lower bounds, bytes / docs metrics).
.Example:
This configuration specifies an index size trigger that monitors collections "test1" and "test2",
with both bytes (1GB) and number of docs (1 mln) upper limits, and a custom `belowOp`
operation `NONE` (which still can be monitored and acted upon by an appropriate trigger listener):
[source,json]
----
{
"set-trigger": {
"name" : "index_size_trigger",
"event" : "indexSize",
"collections" : "test1,test2",
"aboveBytes" : 1000000000,
"aboveDocs" : 1000000000,
"belowBytes" : 200000,
"belowDocs" : 200000,
"belopOp" : "NONE",
"waitFor" : "1m",
"enabled" : true,
"actions" : [
{
"name" : "compute_plan",
"class": "solr.ComputePlanAction"
},
{
"name" : "execute_plan",
"class": "solr.ExecutePlanAction"
}
]
}
}
----
== Search Rate Trigger == Search Rate Trigger
The search rate trigger can be used for monitoring 1-minute average search rates in a selected The search rate trigger can be used for monitoring 1-minute average search rates in a selected

View File

@ -466,7 +466,10 @@ public class Policy implements MapWriter {
static { static {
ops.put(CollectionAction.ADDREPLICA, () -> new AddReplicaSuggester()); ops.put(CollectionAction.ADDREPLICA, () -> new AddReplicaSuggester());
ops.put(CollectionAction.DELETEREPLICA, () -> new UnsupportedSuggester(CollectionAction.DELETEREPLICA));
ops.put(CollectionAction.MOVEREPLICA, () -> new MoveReplicaSuggester()); ops.put(CollectionAction.MOVEREPLICA, () -> new MoveReplicaSuggester());
ops.put(CollectionAction.SPLITSHARD, () -> new SplitShardSuggester());
ops.put(CollectionAction.MERGESHARDS, () -> new UnsupportedSuggester(CollectionAction.MERGESHARDS));
} }
public Map<String, List<Clause>> getPolicies() { public Map<String, List<Clause>> getPolicies() {

View File

@ -66,6 +66,10 @@ public class ReplicaInfo implements MapWriter {
this.node = node; this.node = node;
} }
public Object clone() {
return new ReplicaInfo(name, core, collection, shard, type, node, variables);
}
@Override @Override
public void writeMap(EntryWriter ew) throws IOException { public void writeMap(EntryWriter ew) throws IOException {
ew.put(name, (MapWriter) ew1 -> { ew.put(name, (MapWriter) ew1 -> {

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.util.Collections;
import java.util.Set;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.util.Pair;
/**
* This suggester produces a SPLITSHARD request using provided {@link org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint#COLL_SHARD} value.
*/
class SplitShardSuggester extends Suggester {
@Override
SolrRequest init() {
Set<Pair<String, String>> shards = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
if (shards.isEmpty()) {
throw new RuntimeException("split-shard requires 'collection' and 'shard'");
}
if (shards.size() > 1) {
throw new RuntimeException("split-shard requires exactly one pair of 'collection' and 'shard'");
}
Pair<String, String> collShard = shards.iterator().next();
return CollectionAdminRequest.splitShard(collShard.first()).setShardName(collShard.second());
}
}

View File

@ -28,5 +28,6 @@ public enum TriggerEventType {
SEARCHRATE, SEARCHRATE,
INDEXRATE, INDEXRATE,
INVALID, INVALID,
METRIC METRIC,
INDEXSIZE
} }

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.common.params.CollectionParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This suggester simply logs the request but does not produce any suggestions.
*/
public class UnsupportedSuggester extends Suggester {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final CollectionParams.CollectionAction action;
public static UnsupportedSuggester get(Policy.Session session, CollectionParams.CollectionAction action) {
UnsupportedSuggester suggester = new UnsupportedSuggester(action);
suggester._init(session);
return suggester;
}
public UnsupportedSuggester(CollectionParams.CollectionAction action) {
this.action = action;
}
@Override
public CollectionParams.CollectionAction getAction() {
return action;
}
@Override
SolrRequest init() {
log.warn("Unsupported suggester for action " + action + " with hints " + hints + " - no suggestion available");
return null;
}
@Override
public SolrRequest getSuggestion() {
return null;
}
}

View File

@ -119,7 +119,9 @@ public interface CollectionParams {
REPLACENODE(true, LockLevel.NONE), REPLACENODE(true, LockLevel.NONE),
DELETENODE(true, LockLevel.NONE), DELETENODE(true, LockLevel.NONE),
MOCK_REPLICA_TASK(false, LockLevel.REPLICA), MOCK_REPLICA_TASK(false, LockLevel.REPLICA),
NONE(false, LockLevel.NONE) NONE(false, LockLevel.NONE),
// TODO: not implemented yet
MERGESHARDS(true, LockLevel.SHARD)
; ;
public final boolean isWrite; public final boolean isWrite;

View File

@ -281,7 +281,7 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
/** /**
* Return a {@link CollectionStatePredicate} that returns true if a collection has the expected * Return a {@link CollectionStatePredicate} that returns true if a collection has the expected
* number of shards and replicas * number of active shards and active replicas
*/ */
public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) { public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) {
return (liveNodes, collectionState) -> { return (liveNodes, collectionState) -> {