mirror of https://github.com/apache/lucene.git
SOLR-13831: Support defining arbitrary autoscaling simulation scenarios.
This commit is contained in:
parent
98cdac82a1
commit
c1174dc0d6
|
@ -107,6 +107,8 @@ Improvements
|
||||||
|
|
||||||
* SOLR-13731: 'javabin' must support a 1:1 mapping of the JSON update format (noble)
|
* SOLR-13731: 'javabin' must support a 1:1 mapping of the JSON update format (noble)
|
||||||
|
|
||||||
|
* SOLR-13831: Support defining arbitrary autoscaling simulation scenarios. (ab)
|
||||||
|
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
---------------------
|
---------------------
|
||||||
|
|
|
@ -244,12 +244,12 @@ public class CloudUtil {
|
||||||
boolean requireLeaders) {
|
boolean requireLeaders) {
|
||||||
return (liveNodes, collectionState) -> {
|
return (liveNodes, collectionState) -> {
|
||||||
if (collectionState == null) {
|
if (collectionState == null) {
|
||||||
log.info("-- null collection");
|
log.debug("-- null collection");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
Collection<Slice> slices = withInactive ? collectionState.getSlices() : collectionState.getActiveSlices();
|
Collection<Slice> slices = withInactive ? collectionState.getSlices() : collectionState.getActiveSlices();
|
||||||
if (slices.size() != expectedShards) {
|
if (slices.size() != expectedShards) {
|
||||||
log.info("-- wrong number of slices for collection {}, expected={}, found={}: {}", collectionState.getName(), expectedShards, collectionState.getSlices().size(), collectionState.getSlices());
|
log.debug("-- wrong number of slices for collection {}, expected={}, found={}: {}", collectionState.getName(), expectedShards, collectionState.getSlices().size(), collectionState.getSlices());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
Set<String> leaderless = new HashSet<>();
|
Set<String> leaderless = new HashSet<>();
|
||||||
|
@ -268,7 +268,7 @@ public class CloudUtil {
|
||||||
activeReplicas++;
|
activeReplicas++;
|
||||||
}
|
}
|
||||||
if (activeReplicas != expectedReplicas) {
|
if (activeReplicas != expectedReplicas) {
|
||||||
log.info("-- wrong number of active replicas for collection {} in slice {}, expected={}, found={}", collectionState.getName(), slice.getName(), expectedReplicas, activeReplicas);
|
log.debug("-- wrong number of active replicas for collection {} in slice {}, expected={}, found={}", collectionState.getName(), slice.getName(), expectedReplicas, activeReplicas);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -595,7 +595,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
|
||||||
private static String fullName = SystemLogListener.class.getName();
|
private static String fullName = SystemLogListener.class.getName();
|
||||||
private static String solrName = "solr." + SystemLogListener.class.getSimpleName();
|
private static String solrName = "solr." + SystemLogListener.class.getSimpleName();
|
||||||
|
|
||||||
static AutoScalingConfig withSystemLogListener(AutoScalingConfig autoScalingConfig, String triggerName) {
|
public static AutoScalingConfig withSystemLogListener(AutoScalingConfig autoScalingConfig, String triggerName) {
|
||||||
Map<String, AutoScalingConfig.TriggerListenerConfig> configs = autoScalingConfig.getTriggerListenerConfigs();
|
Map<String, AutoScalingConfig.TriggerListenerConfig> configs = autoScalingConfig.getTriggerListenerConfigs();
|
||||||
for (AutoScalingConfig.TriggerListenerConfig cfg : configs.values()) {
|
for (AutoScalingConfig.TriggerListenerConfig cfg : configs.values()) {
|
||||||
if (triggerName.equals(cfg.trigger)) {
|
if (triggerName.equals(cfg.trigger)) {
|
||||||
|
|
|
@ -134,6 +134,8 @@ public class ScheduledTriggers implements Closeable {
|
||||||
|
|
||||||
private final TriggerListeners listeners;
|
private final TriggerListeners listeners;
|
||||||
|
|
||||||
|
private final List<TriggerListener> additionalListeners = new ArrayList<>();
|
||||||
|
|
||||||
private AutoScalingConfig autoScalingConfig;
|
private AutoScalingConfig autoScalingConfig;
|
||||||
|
|
||||||
public ScheduledTriggers(SolrResourceLoader loader, SolrCloudManager cloudManager) {
|
public ScheduledTriggers(SolrResourceLoader loader, SolrCloudManager cloudManager) {
|
||||||
|
@ -552,6 +554,22 @@ public class ScheduledTriggers implements Closeable {
|
||||||
log.debug("ScheduledTriggers closed completely");
|
log.debug("ScheduledTriggers closed completely");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a temporary listener for internal use (tests, simulation).
|
||||||
|
* @param listener listener instance
|
||||||
|
*/
|
||||||
|
public void addAdditionalListener(TriggerListener listener) {
|
||||||
|
listeners.addAdditionalListener(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a temporary listener for internal use (tests, simulation).
|
||||||
|
* @param listener listener instance
|
||||||
|
*/
|
||||||
|
public void removeAdditionalListener(TriggerListener listener) {
|
||||||
|
listeners.removeAdditionalListener(listener);
|
||||||
|
}
|
||||||
|
|
||||||
private class TriggerWrapper implements Runnable, Closeable {
|
private class TriggerWrapper implements Runnable, Closeable {
|
||||||
AutoScaling.Trigger trigger;
|
AutoScaling.Trigger trigger;
|
||||||
ScheduledFuture<?> scheduledFuture;
|
ScheduledFuture<?> scheduledFuture;
|
||||||
|
@ -657,6 +675,7 @@ public class ScheduledTriggers implements Closeable {
|
||||||
private class TriggerListeners {
|
private class TriggerListeners {
|
||||||
Map<String, Map<TriggerEventProcessorStage, List<TriggerListener>>> listenersPerStage = new HashMap<>();
|
Map<String, Map<TriggerEventProcessorStage, List<TriggerListener>>> listenersPerStage = new HashMap<>();
|
||||||
Map<String, TriggerListener> listenersPerName = new HashMap<>();
|
Map<String, TriggerListener> listenersPerName = new HashMap<>();
|
||||||
|
List<TriggerListener> additionalListeners = new ArrayList<>();
|
||||||
ReentrantLock updateLock = new ReentrantLock();
|
ReentrantLock updateLock = new ReentrantLock();
|
||||||
|
|
||||||
public TriggerListeners() {
|
public TriggerListeners() {
|
||||||
|
@ -680,6 +699,41 @@ public class ScheduledTriggers implements Closeable {
|
||||||
return new TriggerListeners(listenersPerStage, listenersPerName);
|
return new TriggerListeners(listenersPerStage, listenersPerName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addAdditionalListener(TriggerListener listener) {
|
||||||
|
updateLock.lock();
|
||||||
|
try {
|
||||||
|
AutoScalingConfig.TriggerListenerConfig config = listener.getConfig();
|
||||||
|
for (TriggerEventProcessorStage stage : config.stages) {
|
||||||
|
addPerStage(config.trigger, stage, listener);
|
||||||
|
}
|
||||||
|
// add also for beforeAction / afterAction TriggerStage
|
||||||
|
if (!config.beforeActions.isEmpty()) {
|
||||||
|
addPerStage(config.trigger, TriggerEventProcessorStage.BEFORE_ACTION, listener);
|
||||||
|
}
|
||||||
|
if (!config.afterActions.isEmpty()) {
|
||||||
|
addPerStage(config.trigger, TriggerEventProcessorStage.AFTER_ACTION, listener);
|
||||||
|
}
|
||||||
|
additionalListeners.add(listener);
|
||||||
|
} finally {
|
||||||
|
updateLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeAdditionalListener(TriggerListener listener) {
|
||||||
|
updateLock.lock();
|
||||||
|
try {
|
||||||
|
listenersPerName.remove(listener.getConfig().name);
|
||||||
|
listenersPerStage.forEach((trigger, perStage) -> {
|
||||||
|
perStage.forEach((stage, listeners) -> {
|
||||||
|
listeners.remove(listener);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
additionalListeners.remove(listener);
|
||||||
|
} finally {
|
||||||
|
updateLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
|
void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
|
||||||
updateLock.lock();
|
updateLock.lock();
|
||||||
// we will recreate this from scratch
|
// we will recreate this from scratch
|
||||||
|
@ -756,6 +810,13 @@ public class ScheduledTriggers implements Closeable {
|
||||||
addPerStage(config.trigger, TriggerEventProcessorStage.AFTER_ACTION, listener);
|
addPerStage(config.trigger, TriggerEventProcessorStage.AFTER_ACTION, listener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// re-add additional listeners
|
||||||
|
List<TriggerListener> additional = new ArrayList<>(additionalListeners);
|
||||||
|
additionalListeners.clear();
|
||||||
|
for (TriggerListener listener : additional) {
|
||||||
|
addAdditionalListener(listener);
|
||||||
|
}
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
updateLock.unlock();
|
updateLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -844,11 +844,14 @@ public class SimCloudManager implements SolrCloudManager {
|
||||||
String a = params != null ? params.get(CoreAdminParams.ACTION) : null;
|
String a = params != null ? params.get(CoreAdminParams.ACTION) : null;
|
||||||
SolrResponse rsp = new SolrResponseBase();
|
SolrResponse rsp = new SolrResponseBase();
|
||||||
rsp.setResponse(new NamedList<>());
|
rsp.setResponse(new NamedList<>());
|
||||||
|
String path = params != null ? params.get("path") : null;
|
||||||
if (!(req instanceof CollectionAdminRequest)) {
|
if (!(req instanceof CollectionAdminRequest)) {
|
||||||
// maybe a V2Request?
|
// maybe a V2Request?
|
||||||
if (req instanceof V2Request) {
|
if (req instanceof V2Request) {
|
||||||
params = SimUtils.v2AdminRequestToV1Params((V2Request)req);
|
params = SimUtils.v2AdminRequestToV1Params((V2Request)req);
|
||||||
a = params.get(CoreAdminParams.ACTION);
|
a = params.get(CoreAdminParams.ACTION);
|
||||||
|
} else if (path != null && (path.startsWith("/admin/") || path.startsWith("/cluster/"))) {
|
||||||
|
// pass it through, it's likely a generic request containing admin params
|
||||||
} else {
|
} else {
|
||||||
throw new UnsupportedOperationException("Only some CollectionAdminRequest-s are supported: " + req.getClass().getName() + ": " + req.getPath() + " " + req.getParams());
|
throw new UnsupportedOperationException("Only some CollectionAdminRequest-s are supported: " + req.getClass().getName() + ": " + req.getPath() + " " + req.getParams());
|
||||||
}
|
}
|
||||||
|
|
|
@ -700,6 +700,19 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
||||||
cloudManager.getTimeSource().sleep(delays.get(op));
|
cloudManager.getTimeSource().sleep(delays.get(op));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void simSetOpDelays(String collection, Map<String, Long> delays) {
|
||||||
|
Map<String, Long> currentDelays = opDelays.getOrDefault(collection, Collections.emptyMap());
|
||||||
|
Map<String, Long> newDelays = new HashMap<>(currentDelays);
|
||||||
|
delays.forEach((k, v) -> {
|
||||||
|
if (v == null) {
|
||||||
|
newDelays.remove(k);
|
||||||
|
} else {
|
||||||
|
newDelays.put(k, v);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
opDelays.put(collection, newDelays);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simulate running a shard leader election. This operation is a no-op if a leader already exists.
|
* Simulate running a shard leader election. This operation is a no-op if a leader already exists.
|
||||||
* If a new leader is elected the cluster state is saved.
|
* If a new leader is elected the cluster state is saved.
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -134,17 +134,20 @@ public class SimUtils {
|
||||||
}
|
}
|
||||||
allReplicaInfos.keySet().forEach(collection -> {
|
allReplicaInfos.keySet().forEach(collection -> {
|
||||||
Set<String> infosCores = allReplicaInfos.getOrDefault(collection, Collections.emptyMap()).keySet();
|
Set<String> infosCores = allReplicaInfos.getOrDefault(collection, Collections.emptyMap()).keySet();
|
||||||
Set<String> csCores = allReplicas.getOrDefault(collection, Collections.emptyMap()).keySet();
|
Map<String, Replica> replicas = allReplicas.getOrDefault(collection, Collections.emptyMap());
|
||||||
|
Set<String> csCores = replicas.keySet();
|
||||||
if (!infosCores.equals(csCores)) {
|
if (!infosCores.equals(csCores)) {
|
||||||
Set<String> notInClusterState = infosCores.stream()
|
Set<String> notInClusterState = infosCores.stream()
|
||||||
.filter(k -> !csCores.contains(k))
|
.filter(k -> !csCores.contains(k))
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
Set<String> notInNodeProvider = csCores.stream()
|
Set<String> notInNodeProvider = csCores.stream()
|
||||||
.filter(k -> !infosCores.contains(k))
|
.filter(k -> !infosCores.contains(k) && replicas.get(k).isActive(solrCloudManager.getClusterStateProvider().getLiveNodes()))
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
throw new RuntimeException("Mismatched replica data between ClusterState and NodeStateProvider:\n\t" +
|
if (!notInClusterState.isEmpty() || !notInNodeProvider.isEmpty()) {
|
||||||
"replica not in ClusterState: " + notInClusterState + "\n\t" +
|
throw new RuntimeException("Mismatched replica data for collection " + collection + " between ClusterState and NodeStateProvider:\n\t" +
|
||||||
"replica not in NodeStateProvider: " + notInNodeProvider);
|
"replica in NodeStateProvider but not in ClusterState: " + notInClusterState + "\n\t" +
|
||||||
|
"replica in ClusterState but not in NodeStateProvider: " + notInNodeProvider);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
// verify all replicas have size info
|
// verify all replicas have size info
|
||||||
|
@ -349,6 +352,9 @@ public class SimUtils {
|
||||||
}
|
}
|
||||||
String a = cmd.keySet().iterator().next();
|
String a = cmd.keySet().iterator().next();
|
||||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
|
if (req.getParams() != null) {
|
||||||
|
params.add(req.getParams());
|
||||||
|
}
|
||||||
params.add(CollectionAdminParams.COLLECTION, path.substring(3));
|
params.add(CollectionAdminParams.COLLECTION, path.substring(3));
|
||||||
if (req.getParams() != null) {
|
if (req.getParams() != null) {
|
||||||
params.add(req.getParams());
|
params.add(req.getParams());
|
||||||
|
|
|
@ -116,6 +116,7 @@ import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
import org.apache.solr.cloud.autoscaling.sim.NoopDistributedQueueFactory;
|
import org.apache.solr.cloud.autoscaling.sim.NoopDistributedQueueFactory;
|
||||||
import org.apache.solr.cloud.autoscaling.sim.SimCloudManager;
|
import org.apache.solr.cloud.autoscaling.sim.SimCloudManager;
|
||||||
|
import org.apache.solr.cloud.autoscaling.sim.SimScenario;
|
||||||
import org.apache.solr.cloud.autoscaling.sim.SimUtils;
|
import org.apache.solr.cloud.autoscaling.sim.SimUtils;
|
||||||
import org.apache.solr.cloud.autoscaling.sim.SnapshotCloudManager;
|
import org.apache.solr.cloud.autoscaling.sim.SnapshotCloudManager;
|
||||||
import org.apache.solr.common.MapWriter;
|
import org.apache.solr.common.MapWriter;
|
||||||
|
@ -930,11 +931,16 @@ public class SolrCLI implements CLIO {
|
||||||
.withLongOpt("iterations")
|
.withLongOpt("iterations")
|
||||||
.create("i"),
|
.create("i"),
|
||||||
OptionBuilder
|
OptionBuilder
|
||||||
.withDescription("Save autoscaling shapshots at each step of simulated execution.")
|
.withDescription("Save autoscaling snapshots at each step of simulated execution.")
|
||||||
.withArgName("DIR")
|
.withArgName("DIR")
|
||||||
.withLongOpt("saveSimulated")
|
.withLongOpt("saveSimulated")
|
||||||
.hasArg()
|
.hasArg()
|
||||||
.create("ss"),
|
.create("ss"),
|
||||||
|
OptionBuilder
|
||||||
|
.withDescription("Execute a scenario from a file (and ignore all other options).")
|
||||||
|
.withArgName("FILE")
|
||||||
|
.hasArg()
|
||||||
|
.create("scenario"),
|
||||||
OptionBuilder
|
OptionBuilder
|
||||||
.withDescription("Turn on all options to get all available information.")
|
.withDescription("Turn on all options to get all available information.")
|
||||||
.create("all")
|
.create("all")
|
||||||
|
@ -949,6 +955,15 @@ public class SolrCLI implements CLIO {
|
||||||
|
|
||||||
protected void runImpl(CommandLine cli) throws Exception {
|
protected void runImpl(CommandLine cli) throws Exception {
|
||||||
raiseLogLevelUnlessVerbose(cli);
|
raiseLogLevelUnlessVerbose(cli);
|
||||||
|
if (cli.hasOption("scenario")) {
|
||||||
|
String data = IOUtils.toString(new FileInputStream(cli.getOptionValue("scenario")), "UTF-8");
|
||||||
|
try (SimScenario scenario = SimScenario.load(data)) {
|
||||||
|
scenario.verbose = verbose;
|
||||||
|
scenario.console = CLIO.getOutStream();
|
||||||
|
scenario.run();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
SnapshotCloudManager cloudManager;
|
SnapshotCloudManager cloudManager;
|
||||||
AutoScalingConfig config = null;
|
AutoScalingConfig config = null;
|
||||||
String configFile = cli.getOptionValue("a");
|
String configFile = cli.getOptionValue("a");
|
||||||
|
|
|
@ -0,0 +1,138 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.solr.cloud.autoscaling.sim;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||||
|
import org.apache.solr.cloud.CloudUtil;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
|
||||||
|
public class TestSimScenario extends SimSolrCloudTestCase {
|
||||||
|
|
||||||
|
// simple scenario to test .autoAddReplicas trigger
|
||||||
|
String autoAddReplicasScenario =
|
||||||
|
"# standard comment\n" +
|
||||||
|
"// java comment\n" +
|
||||||
|
"create_cluster numNodes=2 // inline comment\n" +
|
||||||
|
"load_autoscaling json={'cluster-policy'+:+[{'replica'+:+'<3',+'shard'+:+'#EACH',+'collection'+:+'testCollection','node':'#ANY'}]}&defaultWaitFor=10\n" +
|
||||||
|
"solr_request /admin/collections?action=CREATE&autoAddReplicas=true&name=testCollection&numShards=2&replicationFactor=2&maxShardsPerNode=2\n" +
|
||||||
|
"wait_collection collection=testCollection&shards=2&replicas=2\n" +
|
||||||
|
"event_listener trigger=.auto_add_replicas&stage=SUCCEEDED\n" +
|
||||||
|
"kill_nodes node=${_random_node_}\n" +
|
||||||
|
"wait_event trigger=.auto_add_replicas&wait=60\n" +
|
||||||
|
"wait_collection collection=testCollection&shards=2&replicas=2\n" +
|
||||||
|
"save_snapshot path=${snapshotPath}\n";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoAddReplicas() throws Exception {
|
||||||
|
String snapshotPath = createTempDir() + "/snapshot";
|
||||||
|
try (SimScenario scenario = SimScenario.load(autoAddReplicasScenario)) {
|
||||||
|
scenario.context.put("snapshotPath", snapshotPath);
|
||||||
|
scenario.run();
|
||||||
|
}
|
||||||
|
SnapshotCloudManager snapshotCloudManager = SnapshotCloudManager.readSnapshot(new File(snapshotPath));
|
||||||
|
CloudUtil.waitForState(snapshotCloudManager, "testCollection", 1, TimeUnit.SECONDS,
|
||||||
|
CloudUtil.clusterShape(2, 2));
|
||||||
|
}
|
||||||
|
|
||||||
|
String testSuggestionsScenario =
|
||||||
|
"create_cluster numNodes=2\n" +
|
||||||
|
"solr_request /admin/collections?action=CREATE&autoAddReplicas=true&name=testCollection&numShards=2&replicationFactor=2&maxShardsPerNode=2\n" +
|
||||||
|
"wait_collection collection=testCollection&shards=2&replicas=2\n" +
|
||||||
|
"ctx_set key=myNode&value=${_random_node_}\n" +
|
||||||
|
"solr_request /admin/collections?action=ADDREPLICA&collection=testCollection&shard=shard1&node=${myNode}\n" +
|
||||||
|
"solr_request /admin/collections?action=ADDREPLICA&collection=testCollection&shard=shard1&node=${myNode}\n" +
|
||||||
|
"loop_start iterations=${iterative}\n" +
|
||||||
|
" calculate_suggestions\n" +
|
||||||
|
" apply_suggestions\n" +
|
||||||
|
" solr_request /admin/collections?action=ADDREPLICA&collection=testCollection&shard=shard1&node=${myNode}\n" +
|
||||||
|
" solr_request /admin/collections?action=ADDREPLICA&collection=testCollection&shard=shard1&node=${myNode}\n" +
|
||||||
|
"loop_end\n" +
|
||||||
|
"loop_start iterations=${justCalc}\n" +
|
||||||
|
" calculate_suggestions\n" +
|
||||||
|
"loop_end\n" +
|
||||||
|
"dump redact=true";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSuggestions() throws Exception {
|
||||||
|
try (SimScenario scenario = SimScenario.load(testSuggestionsScenario)) {
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
PrintStream ps = new PrintStream(baos, true, Charset.forName("UTF-8"));
|
||||||
|
scenario.console = ps;
|
||||||
|
scenario.context.put("iterative", "0");
|
||||||
|
scenario.context.put("justCalc", "1");
|
||||||
|
scenario.run();
|
||||||
|
List<Suggester.SuggestionInfo> suggestions = (List<Suggester.SuggestionInfo>)scenario.context.get(SimScenario.SUGGESTIONS_CTX_PROP);
|
||||||
|
assertNotNull(suggestions);
|
||||||
|
assertEquals(suggestions.toString(), 1, suggestions.size());
|
||||||
|
// reconstruct the snapshot from the dump
|
||||||
|
Map<String, Object> snapshot = (Map<String, Object>)Utils.fromJSON(baos.toByteArray());
|
||||||
|
Map<String, Object> autoscalingState = (Map<String, Object>)snapshot.get(SnapshotCloudManager.AUTOSCALING_STATE_KEY);
|
||||||
|
assertNotNull(autoscalingState);
|
||||||
|
assertEquals(autoscalingState.toString(), 1, autoscalingState.size());
|
||||||
|
assertTrue(autoscalingState.toString(), autoscalingState.containsKey("suggestions"));
|
||||||
|
List<Map<String, Object>> snapSuggestions = (List<Map<String, Object>>)autoscalingState.get("suggestions");
|
||||||
|
assertEquals(snapSuggestions.toString(), 1, snapSuggestions.size());
|
||||||
|
// _loop_iter_ should be present and 0 (first iteration)
|
||||||
|
assertEquals(0, scenario.context.get(SimScenario.LOOP_ITER_PROP));
|
||||||
|
}
|
||||||
|
// try looping more times
|
||||||
|
try (SimScenario scenario = SimScenario.load(testSuggestionsScenario)) {
|
||||||
|
scenario.context.put("iterative", "10");
|
||||||
|
scenario.context.put("justCalc", "0");
|
||||||
|
scenario.run();
|
||||||
|
assertEquals(9, scenario.context.get(SimScenario.LOOP_ITER_PROP));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
String indexingScenario =
|
||||||
|
"create_cluster numNodes=100\n" +
|
||||||
|
"solr_request /admin/collections?action=CREATE&autoAddReplicas=true&name=testCollection&numShards=2&replicationFactor=2&maxShardsPerNode=2\n" +
|
||||||
|
"wait_collection collection=testCollection&shards=2&replicas=2\n" +
|
||||||
|
"solr_request /admin/autoscaling?httpMethod=POST&stream.body=" +
|
||||||
|
"{'set-trigger':{'name':'indexSizeTrigger','event':'indexSize','waitFor':'10s','aboveDocs':1000,'enabled':true,"+
|
||||||
|
"'actions':[{'name':'compute_plan','class':'solr.ComputePlanAction'},{'name':'execute_plan','class':'solr.ExecutePlanAction'}]}}\n" +
|
||||||
|
"event_listener trigger=indexSizeTrigger&stage=SUCCEEDED\n" +
|
||||||
|
"index_docs collection=testCollection&numDocs=3000\n" +
|
||||||
|
"run\n" +
|
||||||
|
"wait_event trigger=indexSizeTrigger&wait=60\n" +
|
||||||
|
"assert condition=not_null&key=_trigger_event_indexSizeTrigger\n" +
|
||||||
|
"assert condition=equals&key=_trigger_event_indexSizeTrigger/eventType&expected=INDEXSIZE\n" +
|
||||||
|
"assert condition=equals&key=_trigger_event_indexSizeTrigger/properties/requestedOps[0]/action&expected=SPLITSHARD\n" +
|
||||||
|
"wait_collection collection=testCollection&shards=6&withInactive=true&requireLeaders=false&replicas=2";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIndexing() throws Exception {
|
||||||
|
try (SimScenario scenario = SimScenario.load(indexingScenario)) {
|
||||||
|
scenario.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -499,3 +499,143 @@ Number of iterations of the simulation loop. Default is 10.
|
||||||
|
|
||||||
Results of the simulation contain the initial suggestions, suggestions at each step of the
|
Results of the simulation contain the initial suggestions, suggestions at each step of the
|
||||||
simulation and the final simulated state of the cluster.
|
simulation and the final simulated state of the cluster.
|
||||||
|
|
||||||
|
=== Simulation scenario tool
|
||||||
|
The autoscaling command-line tool supports also the execution of end-to-end simulation scenarios consisting of
|
||||||
|
several cluster- and collection-level operations and events.
|
||||||
|
|
||||||
|
This tool can be invoked using `bin/solr autoscaling -scenario <FILE>`. All other command-line options are ignored in this mode.
|
||||||
|
|
||||||
|
The file describing a scenario to test uses a simple plain text (UTF-8 encoded) line-oriented format, where
|
||||||
|
each line of text uses the following syntax:
|
||||||
|
|
||||||
|
[source,text]
|
||||||
|
----
|
||||||
|
line := command whitespace params | '#'
|
||||||
|
params := [ path, '?' ] key, '=', value { '&', key, '=', value } *
|
||||||
|
----
|
||||||
|
|
||||||
|
Keys and values additionally use www-urlencoded format to avoid meta-characters and non-ascii characters.
|
||||||
|
|
||||||
|
The `params` part of the line closely follows a regular Solr params representation on purpose - in many cases
|
||||||
|
the content of this part of the command is passed directly to the respective collection- or cluster-level API.
|
||||||
|
|
||||||
|
==== Scenario context
|
||||||
|
Scenario has a context, which is simply a map of key-value pairs. Before executing each command the context is
|
||||||
|
updated to contain the current values for the following properties:
|
||||||
|
|
||||||
|
* `_random_node_` - randomly selected node name, or null if no node is live
|
||||||
|
* `_overseer_leader_` - node name of the current Overseer leader node, or absent if there's no Overseer
|
||||||
|
* `_live_nodes_` - a list of current live nodes, or absent if there are no live nodes
|
||||||
|
* `_collections_` - a list of existing collections, or absent if there are no collections (or no live nodes)
|
||||||
|
* `_suggestions_` - a list of autoscaling suggestions generated using CREATE_SUGGESTIONS command.
|
||||||
|
* `_responses_` - a list of SolrResponse-s resulting from SOLR_REQUEST commands.
|
||||||
|
* `_loop_iter_` - current loop iteration, or absent outside of loop.
|
||||||
|
* `_trigger_event_<triggerName>` - last trigger event captured by WAIT_EVENT
|
||||||
|
|
||||||
|
Command parameters support variable expansion using string values from the current context (non-string values, including numeric, are ignored)
|
||||||
|
and from system properties, with the context values taking precedence if set.
|
||||||
|
|
||||||
|
For example, assuming a system property is set 'foo=bar', the following command will load a snapshot from
|
||||||
|
`/tmp/bar`:
|
||||||
|
[source,text]
|
||||||
|
----
|
||||||
|
load_snapshot path=/tmp/${foo}
|
||||||
|
----
|
||||||
|
|
||||||
|
==== Scenario commands
|
||||||
|
The following commands are supported (command names are case insensitive, but params aren't):
|
||||||
|
|
||||||
|
* `create_cluster numNodes=N[&disableMetricsHistory=false&timeSourcee=simTime:50]` - create a simulated cluster with N nodes
|
||||||
|
* `load_snapshot (path=/some/path | zkHost=ZK_CONNECT_STRING)` - create a simulated cluster from an autoscaling snapshot or from a live cluster.
|
||||||
|
* `save_snapshot path=/some/path[&redact=false]` - save an autoscaling snapshot of the current simulated cluster state.
|
||||||
|
* `calculate_suggestions` - calculate autoscaling suggestions based on the current cluster state and the policy.
|
||||||
|
* `apply_suggestions` - apply previously calculated suggestions.
|
||||||
|
* `kill_nodes (numNodes=N | nodes=node1,node2,...)` - kill a number of randomly selected nodes, or specific nodes.
|
||||||
|
* `add_nodes numNodes=N` - add a number of new nodes.
|
||||||
|
* `load_autoscaling (path=/some/path | json={...}` - load `autoscaling.json` config from a path or from the supplied JSON string, and apply this config to the simulated cluster.
|
||||||
|
* `loop_start [iterations=N]`, `loop_end` - iterate commands enclosed in `loop_start` / `loop_end` N times, or until a loop abort is requested.
|
||||||
|
* `set_op_delays op1=delayMs1&op2=delayMs2...` - set operation delays for specific collection commands to simulate slow execution.
|
||||||
|
* `solr_request /admin/handler?httpMethod=POST&stream.body={'json':'body'}&other=params` - execute one of SolrRequest types supported by `SimCloudManager`.
|
||||||
|
* `run [time=60000]` - run the simulator for some time, allowing background tasks to execute (eg. trigger event processing).
|
||||||
|
* `wait_collection collection=test&shards=N&replicas=M[&withInactive=false&requireLeaders=true&wait=90]` - wait until the collection shape matches the criteria or the wait time elapses (in which case an error is thrown).
|
||||||
|
* `event_listener trigger=triggerName&stage=SUCCEEDED[&beforeAction=foo | &afterAction=bar]` - prepare to listen for a specific trigger event.
|
||||||
|
* `wait_event trigger=triggerName[&wait=90]` - wait until an event specified in `event_listener` is captured or a wait time elapses (in which cases an error is thrown).
|
||||||
|
* `ctx_set key=myKey&value=myValue` - set a key / value pair in the scenario's context.
|
||||||
|
* `ctx_remove key=myKey` - remove a key / value pair from the scenario's context.
|
||||||
|
* `dump [redact=false&withData=false&withStats=false&withSuggestions=false&withDiagnostics=false&withNodeState=false&withClusterState=false&withManagerState=false]` - dump the simulator state to the console.
|
||||||
|
* `set_node_metrics nodeset=node1,node2...&aKey1=aValue1&aKey2=aValue2...` - set node metrics.
|
||||||
|
* `set_shard_metrics collection=test&shard=shard1[&delta=false÷=false]&aKey1=aValue1&aKey2=aValue2...` - set per-shard metrics, optionally expressed as delta change from existing values and optionally with the values divided across existing replicas for a shard.
|
||||||
|
* `index_docs numDocs=NNN[&start=XXX]` - simulate bulk indexing of a large number of documents.
|
||||||
|
* `assert condition=(equals | not_equals | null | not_null)&(key=objectPath | value=myValue)[&expected=value]` - assert a condition. When `key` is specified then it can be an object path to complex values present in the scenario's context.
|
||||||
|
|
||||||
|
==== Example scenarios
|
||||||
|
Example scenario testing the behavior of `.autoAddReplicas` trigger:
|
||||||
|
[source,text]
|
||||||
|
----
|
||||||
|
# standard comment
|
||||||
|
// java comment
|
||||||
|
create_cluster numNodes=2 // inline comment
|
||||||
|
// load autoscaling config from a json string. Notice that the value must be URL-encoded
|
||||||
|
load_autoscaling json={'cluster-policy'+:+[{'replica'+:+'<3',+'shard'+:+'#EACH',+'collection'+:+'testCollection','node':'#ANY'}]}&defaultWaitFor=10
|
||||||
|
solr_request /admin/collections?action=CREATE&autoAddReplicas=true&name=testCollection&numShards=2&replicationFactor=2&maxShardsPerNode=2
|
||||||
|
wait_collection collection=testCollection&shards=2&replicas=2
|
||||||
|
// prepare a listener for trigger events and the processing state SUCCEEDED
|
||||||
|
event_listener trigger=.auto_add_replicas&stage=SUCCEEDED
|
||||||
|
// kill a random node
|
||||||
|
kill_nodes node=${_random_node_}
|
||||||
|
// wait for the listener to capture the event
|
||||||
|
wait_event trigger=.auto_add_replicas&wait=60
|
||||||
|
// the collection should have the same shape as before
|
||||||
|
wait_collection collection=testCollection&shards=2&replicas=2
|
||||||
|
save_snapshot path=${snapshotPath}
|
||||||
|
----
|
||||||
|
|
||||||
|
Example scenario testing the behavior of `indexSize` trigger. Notice the use of POST SolrRequest and the use of
|
||||||
|
`assert` command with an object path:
|
||||||
|
|
||||||
|
[source,text]
|
||||||
|
----
|
||||||
|
create_cluster numNodes=100
|
||||||
|
solr_request /admin/collections?action=CREATE&autoAddReplicas=true&name=testCollection&numShards=2&replicationFactor=2&maxShardsPerNode=2
|
||||||
|
wait_collection collection=testCollection&shards=2&replicas=2
|
||||||
|
// example of defining a trigger config
|
||||||
|
solr_request /admin/autoscaling?httpMethod=POST&stream.body={'set-trigger':{'name':'indexSizeTrigger','event':'indexSize','waitFor':'10s','aboveDocs':1000,'enabled':true,'actions':[{'name':'compute_plan','class':'solr.ComputePlanAction'},{'name':'execute_plan','class':'solr.ExecutePlanAction'}]}}
|
||||||
|
// prepare an event listener
|
||||||
|
event_listener trigger=indexSizeTrigger&stage=SUCCEEDED
|
||||||
|
// add documents
|
||||||
|
index_docs collection=testCollection&numDocs=3000
|
||||||
|
// run for 60 sec
|
||||||
|
run
|
||||||
|
// wait for a trigger event (as defined in the listener)
|
||||||
|
wait_event trigger=indexSizeTrigger&wait=60
|
||||||
|
// even is stored in the context
|
||||||
|
assert condition=not_null&key=_trigger_event_indexSizeTrigger
|
||||||
|
assert condition=equals&key=_trigger_event_indexSizeTrigger/eventType&expected=INDEXSIZE
|
||||||
|
assert condition=equals&key=_trigger_event_indexSizeTrigger/properties/requestedOps[0]/action&expected=SPLITSHARD
|
||||||
|
wait_collection collection=testCollection&shards=6&withInactive=true&requireLeaders=false&replicas=2
|
||||||
|
----
|
||||||
|
|
||||||
|
Example scenario where context variables are used for conditional execution of loops. Depending on the value of
|
||||||
|
`iterative` and `justCalc` the two loops will execute 0 or more times. Notice also how the scenario picks up
|
||||||
|
a random node to consistently add replicas to it.
|
||||||
|
|
||||||
|
[source,text]
|
||||||
|
----
|
||||||
|
create_cluster numNodes=2
|
||||||
|
solr_request /admin/collections?action=CREATE&autoAddReplicas=true&name=testCollection&numShards=2&replicationFactor=2&maxShardsPerNode=10
|
||||||
|
wait_collection collection=testCollection&shards=2&replicas=2
|
||||||
|
ctx_set key=myNode&value=${_random_node_}
|
||||||
|
solr_request /admin/collections?action=ADDREPLICA&collection=testCollection&shard=shard1&node=${myNode}
|
||||||
|
solr_request /admin/collections?action=ADDREPLICA&collection=testCollection&shard=shard1&node=${myNode}
|
||||||
|
loop_start iterations=${iterative}
|
||||||
|
calculate_suggestions
|
||||||
|
apply_suggestions
|
||||||
|
solr_request /admin/collections?action=ADDREPLICA&collection=testCollection&shard=shard1&node=${myNode}
|
||||||
|
solr_request /admin/collections?action=ADDREPLICA&collection=testCollection&shard=shard1&node=${myNode}
|
||||||
|
loop_end
|
||||||
|
loop_start iterations=${justCalc}
|
||||||
|
calculate_suggestions
|
||||||
|
loop_end
|
||||||
|
dump redact=true
|
||||||
|
----
|
|
@ -331,7 +331,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cnt == retries) {
|
if (cnt == retries || rsp == null) {
|
||||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not get remote info after many retries on NoHttpResponseException");
|
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not get remote info after many retries on NoHttpResponseException");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue