mirror of https://github.com/apache/lucene.git
SOLR-11072: Fix searchRate trigger hookup to ComputePlanAction. Make
the trigger more robust by supporting host collections with warm shards. Add unit test.
This commit is contained in:
parent
8ef83bff12
commit
6fb2803bca
|
@ -130,6 +130,8 @@ public class AutoScaling {
|
||||||
return new NodeAddedTrigger(name, props, coreContainer);
|
return new NodeAddedTrigger(name, props, coreContainer);
|
||||||
case NODELOST:
|
case NODELOST:
|
||||||
return new NodeLostTrigger(name, props, coreContainer);
|
return new NodeLostTrigger(name, props, coreContainer);
|
||||||
|
case SEARCHRATE:
|
||||||
|
return new SearchRateTrigger(name, props, coreContainer);
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
|
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -35,6 +36,7 @@ import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.AutoScalingParams;
|
import org.apache.solr.common.params.AutoScalingParams;
|
||||||
import org.apache.solr.common.params.CollectionParams;
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
|
import org.apache.solr.common.util.Pair;
|
||||||
import org.apache.solr.core.CoreContainer;
|
import org.apache.solr.core.CoreContainer;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -126,13 +128,27 @@ public class ComputePlanAction extends TriggerActionBase {
|
||||||
} else {
|
} else {
|
||||||
// collection || shard || replica -> ADDREPLICA
|
// collection || shard || replica -> ADDREPLICA
|
||||||
suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA);
|
suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA);
|
||||||
Set<String> collections = new HashSet<>();
|
Map<String, Set<String>> collShards = new HashMap<>();
|
||||||
// XXX improve this when AddReplicaSuggester supports coll_shard hint
|
// AddReplicaSuggester needs a list of Pair(coll, shard)
|
||||||
hotReplicas.forEach(r -> collections.add(r.getCollection()));
|
hotReplicas.forEach(r -> collShards.computeIfAbsent(r.getCollection(), c -> new HashSet<>()).add(r.getShard()));
|
||||||
hotShards.forEach((coll, shards) -> collections.add(coll));
|
hotShards.forEach((coll, shards) -> collShards.computeIfAbsent(coll, c -> new HashSet<>()).addAll(shards.keySet()));
|
||||||
hotCollections.forEach((coll, rate) -> collections.add(coll));
|
// if we only have hotCollections then use warmShards to pick ones to replicate
|
||||||
for (String coll : collections) {
|
Map<String, String> warmShards = (Map<String, String>)event.getProperty(AutoScalingParams.WARM_SHARD);
|
||||||
suggester = suggester.hint(Policy.Suggester.Hint.COLL, coll);
|
hotCollections.forEach((coll, rate) -> {
|
||||||
|
Set<String> shards = collShards.get(coll);
|
||||||
|
if (shards == null || shards.isEmpty()) {
|
||||||
|
String warmShard = warmShards.get(coll);
|
||||||
|
if (warmShard == null) {
|
||||||
|
log.warn("Got hot collection '" + coll + "' but no warm shard! Ignoring...");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
collShards.computeIfAbsent(coll, s -> new HashSet<>()).add(warmShard);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
for (Map.Entry<String, Set<String>> e : collShards.entrySet()) {
|
||||||
|
for (String shard : e.getValue()) {
|
||||||
|
suggester = suggester.hint(Policy.Suggester.Hint.COLL_SHARD, new Pair<>(e.getKey(), shard));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -204,8 +204,10 @@ public class SearchRateTrigger extends TriggerBase {
|
||||||
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get()));
|
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get()));
|
||||||
|
|
||||||
Map<String, Map<String, Double>> hotShards = new HashMap<>();
|
Map<String, Map<String, Double>> hotShards = new HashMap<>();
|
||||||
|
Map<String, String> warmShards = new HashMap<>();
|
||||||
List<ReplicaInfo> hotReplicas = new ArrayList<>();
|
List<ReplicaInfo> hotReplicas = new ArrayList<>();
|
||||||
collectionRates.forEach((coll, shardRates) -> {
|
collectionRates.forEach((coll, shardRates) -> {
|
||||||
|
final Object[] warmShard = new Object[2];
|
||||||
shardRates.forEach((sh, replicaRates) -> {
|
shardRates.forEach((sh, replicaRates) -> {
|
||||||
double shardRate = replicaRates.stream()
|
double shardRate = replicaRates.stream()
|
||||||
.map(r -> {
|
.map(r -> {
|
||||||
|
@ -216,6 +218,14 @@ public class SearchRateTrigger extends TriggerBase {
|
||||||
return r;
|
return r;
|
||||||
})
|
})
|
||||||
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum();
|
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum();
|
||||||
|
if (warmShard[0] == null) {
|
||||||
|
warmShard[0] = sh;
|
||||||
|
warmShard[1] = shardRate;
|
||||||
|
}
|
||||||
|
if (shardRate > (double)warmShard[1]) {
|
||||||
|
warmShard[0] = sh;
|
||||||
|
warmShard[1] = shardRate;
|
||||||
|
}
|
||||||
if (waitForElapsed(coll + "." + sh, now, lastShardEvent) &&
|
if (waitForElapsed(coll + "." + sh, now, lastShardEvent) &&
|
||||||
(shardRate > rate) &&
|
(shardRate > rate) &&
|
||||||
(collection.equals(Policy.ANY) || collection.equals(coll)) &&
|
(collection.equals(Policy.ANY) || collection.equals(coll)) &&
|
||||||
|
@ -223,11 +233,12 @@ public class SearchRateTrigger extends TriggerBase {
|
||||||
hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
|
hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
warmShards.put(coll, (String)warmShard[0]);
|
||||||
});
|
});
|
||||||
|
|
||||||
Map<String, Double> hotCollections = new HashMap<>();
|
Map<String, Double> hotCollections = new HashMap<>();
|
||||||
collectionRates.forEach((coll, shardRates) -> {
|
collectionRates.forEach((coll, shRates) -> {
|
||||||
double total = shardRates.entrySet().stream()
|
double total = shRates.entrySet().stream()
|
||||||
.mapToDouble(e -> e.getValue().stream()
|
.mapToDouble(e -> e.getValue().stream()
|
||||||
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum();
|
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum();
|
||||||
if (waitForElapsed(coll, now, lastCollectionEvent) &&
|
if (waitForElapsed(coll, now, lastCollectionEvent) &&
|
||||||
|
@ -243,7 +254,7 @@ public class SearchRateTrigger extends TriggerBase {
|
||||||
|
|
||||||
// generate event
|
// generate event
|
||||||
|
|
||||||
if (processor.process(new SearchRateEvent(getName(), now, hotNodes, hotCollections, hotShards, hotReplicas))) {
|
if (processor.process(new SearchRateEvent(getName(), now, hotNodes, hotCollections, hotShards, hotReplicas, warmShards))) {
|
||||||
// update lastEvent times
|
// update lastEvent times
|
||||||
hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
|
hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
|
||||||
hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
|
hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
|
||||||
|
@ -266,12 +277,14 @@ public class SearchRateTrigger extends TriggerBase {
|
||||||
public static class SearchRateEvent extends TriggerEvent {
|
public static class SearchRateEvent extends TriggerEvent {
|
||||||
public SearchRateEvent(String source, long eventTime, Map<String, Double> hotNodes,
|
public SearchRateEvent(String source, long eventTime, Map<String, Double> hotNodes,
|
||||||
Map<String, Double> hotCollections,
|
Map<String, Double> hotCollections,
|
||||||
Map<String, Map<String, Double>> hotShards, List<ReplicaInfo> hotReplicas) {
|
Map<String, Map<String, Double>> hotShards, List<ReplicaInfo> hotReplicas,
|
||||||
|
Map<String, String> warmShards) {
|
||||||
super(TriggerEventType.SEARCHRATE, source, eventTime, null);
|
super(TriggerEventType.SEARCHRATE, source, eventTime, null);
|
||||||
properties.put(AutoScalingParams.COLLECTION, hotCollections);
|
properties.put(AutoScalingParams.COLLECTION, hotCollections);
|
||||||
properties.put(AutoScalingParams.SHARD, hotShards);
|
properties.put(AutoScalingParams.SHARD, hotShards);
|
||||||
properties.put(AutoScalingParams.REPLICA, hotReplicas);
|
properties.put(AutoScalingParams.REPLICA, hotReplicas);
|
||||||
properties.put(AutoScalingParams.NODE, hotNodes);
|
properties.put(AutoScalingParams.NODE, hotNodes);
|
||||||
|
properties.put(AutoScalingParams.WARM_SHARD, warmShards);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -38,6 +38,7 @@ import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.CollectionParams;
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
|
import org.apache.solr.common.params.CommonParams;
|
||||||
import org.apache.solr.common.params.SolrParams;
|
import org.apache.solr.common.params.SolrParams;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.util.LogLevel;
|
import org.apache.solr.util.LogLevel;
|
||||||
|
@ -48,6 +49,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
|
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -115,6 +117,12 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
|
||||||
// expected if testNodeWithMultipleReplicasLost hasn't run already
|
// expected if testNodeWithMultipleReplicasLost hasn't run already
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
CollectionAdminRequest.deleteCollection("testSearchRate").process(solrClient);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// expected if testSearchRate hasn't run already
|
||||||
|
}
|
||||||
|
|
||||||
String setClusterPolicyCommand = "{" +
|
String setClusterPolicyCommand = "{" +
|
||||||
" 'set-cluster-policy': [" +
|
" 'set-cluster-policy': [" +
|
||||||
" {'cores':'<10', 'node':'#ANY'}," +
|
" {'cores':'<10', 'node':'#ANY'}," +
|
||||||
|
@ -343,6 +351,51 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
|
||||||
assertEquals("Unexpected node in computed operation", runner.getNodeName(), nodeAdded);
|
assertEquals("Unexpected node in computed operation", runner.getNodeName(), nodeAdded);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSearchRate() throws Exception {
|
||||||
|
// create an empty node
|
||||||
|
cluster.startJettySolrRunner();
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'search_rate_trigger'," +
|
||||||
|
"'event' : 'searchRate'," +
|
||||||
|
"'waitFor' : '1s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'rate' : 1.0," +
|
||||||
|
"'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
|
||||||
|
"{'name':'test','class':'" + ComputePlanActionTest.AssertingTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("testSearchRate",
|
||||||
|
"conf",1, 1);
|
||||||
|
create.process(solrClient);
|
||||||
|
|
||||||
|
waitForState("Timed out waiting for replicas of new collection to be active",
|
||||||
|
"testSearchRate", (liveNodes, collectionState) -> collectionState.getReplicas().stream().allMatch(replica -> replica.isActive(liveNodes)));
|
||||||
|
|
||||||
|
// generate some dummy traffic
|
||||||
|
SolrParams query = params(CommonParams.Q, "*:*");
|
||||||
|
for (int i = 0; i < 500; i++) {
|
||||||
|
solrClient.query("testSearchRate", query);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Trigger was not fired", triggerFiredLatch.await(10, TimeUnit.SECONDS));
|
||||||
|
assertTrue(fired.get());
|
||||||
|
Map context = actionContextPropsRef.get();
|
||||||
|
assertNotNull(context);
|
||||||
|
List<SolrRequest> operations = (List<SolrRequest>) context.get("operations");
|
||||||
|
assertNotNull("The operations computed by ComputePlanAction should not be null", operations);
|
||||||
|
assertEquals("ComputePlanAction should have computed exactly 1 operation", 1, operations.size());
|
||||||
|
SolrRequest request = operations.get(0);
|
||||||
|
SolrParams params = request.getParams();
|
||||||
|
assertEquals("Expected ADDREPLICA action after exceeding searchRate", ADDREPLICA, CollectionParams.CollectionAction.get(params.get("action")));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public static class AssertingTriggerAction implements TriggerAction {
|
public static class AssertingTriggerAction implements TriggerAction {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -41,6 +41,7 @@ public interface AutoScalingParams {
|
||||||
String SHARD = "shard";
|
String SHARD = "shard";
|
||||||
String REPLICA = "replica";
|
String REPLICA = "replica";
|
||||||
String NODE = "node";
|
String NODE = "node";
|
||||||
|
String WARM_SHARD = "warmShard";
|
||||||
String HANDLER = "handler";
|
String HANDLER = "handler";
|
||||||
String RATE = "rate";
|
String RATE = "rate";
|
||||||
String REMOVE_LISTENERS = "removeListeners";
|
String REMOVE_LISTENERS = "removeListeners";
|
||||||
|
|
Loading…
Reference in New Issue