diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java index ed24bf7880c..420d4f39645 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java @@ -130,6 +130,8 @@ public class AutoScaling { return new NodeAddedTrigger(name, props, coreContainer); case NODELOST: return new NodeLostTrigger(name, props, coreContainer); + case SEARCHRATE: + return new SearchRateTrigger(name, props, coreContainer); default: throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java index 44b35833713..6451c6778d4 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java @@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -35,6 +36,7 @@ import org.apache.solr.client.solrj.impl.SolrClientDataProvider; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.AutoScalingParams; import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.util.Pair; import org.apache.solr.core.CoreContainer; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -126,13 +128,27 @@ public class ComputePlanAction extends TriggerActionBase { } else { // collection || shard || replica -> ADDREPLICA suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA); - Set collections = new HashSet<>(); - // XXX improve this when AddReplicaSuggester supports coll_shard hint - hotReplicas.forEach(r -> collections.add(r.getCollection())); - hotShards.forEach((coll, shards) -> collections.add(coll)); - hotCollections.forEach((coll, rate) -> collections.add(coll)); - for (String coll : collections) { - suggester = suggester.hint(Policy.Suggester.Hint.COLL, coll); + Map> collShards = new HashMap<>(); + // AddReplicaSuggester needs a list of Pair(coll, shard) + hotReplicas.forEach(r -> collShards.computeIfAbsent(r.getCollection(), c -> new HashSet<>()).add(r.getShard())); + hotShards.forEach((coll, shards) -> collShards.computeIfAbsent(coll, c -> new HashSet<>()).addAll(shards.keySet())); + // if we only have hotCollections then use warmShards to pick ones to replicate + Map warmShards = (Map)event.getProperty(AutoScalingParams.WARM_SHARD); + hotCollections.forEach((coll, rate) -> { + Set shards = collShards.get(coll); + if (shards == null || shards.isEmpty()) { + String warmShard = warmShards.get(coll); + if (warmShard == null) { + log.warn("Got hot collection '" + coll + "' but no warm shard! Ignoring..."); + return; + } + collShards.computeIfAbsent(coll, s -> new HashSet<>()).add(warmShard); + } + }); + for (Map.Entry> e : collShards.entrySet()) { + for (String shard : e.getValue()) { + suggester = suggester.hint(Policy.Suggester.Hint.COLL_SHARD, new Pair<>(e.getKey(), shard)); + } } } break; diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java index 6692fda1220..f8a4eebf3ac 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java @@ -204,8 +204,10 @@ public class SearchRateTrigger extends TriggerBase { .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get())); Map> hotShards = new HashMap<>(); + Map warmShards = new HashMap<>(); List hotReplicas = new ArrayList<>(); collectionRates.forEach((coll, shardRates) -> { + final Object[] warmShard = new Object[2]; shardRates.forEach((sh, replicaRates) -> { double shardRate = replicaRates.stream() .map(r -> { @@ -216,6 +218,14 @@ public class SearchRateTrigger extends TriggerBase { return r; }) .mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum(); + if (warmShard[0] == null) { + warmShard[0] = sh; + warmShard[1] = shardRate; + } + if (shardRate > (double)warmShard[1]) { + warmShard[0] = sh; + warmShard[1] = shardRate; + } if (waitForElapsed(coll + "." + sh, now, lastShardEvent) && (shardRate > rate) && (collection.equals(Policy.ANY) || collection.equals(coll)) && @@ -223,11 +233,12 @@ public class SearchRateTrigger extends TriggerBase { hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate); } }); + warmShards.put(coll, (String)warmShard[0]); }); Map hotCollections = new HashMap<>(); - collectionRates.forEach((coll, shardRates) -> { - double total = shardRates.entrySet().stream() + collectionRates.forEach((coll, shRates) -> { + double total = shRates.entrySet().stream() .mapToDouble(e -> e.getValue().stream() .mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum(); if (waitForElapsed(coll, now, lastCollectionEvent) && @@ -243,7 +254,7 @@ public class SearchRateTrigger extends TriggerBase { // generate event - if (processor.process(new SearchRateEvent(getName(), now, hotNodes, hotCollections, hotShards, hotReplicas))) { + if (processor.process(new SearchRateEvent(getName(), now, hotNodes, hotCollections, hotShards, hotReplicas, warmShards))) { // update lastEvent times hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now)); hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now)); @@ -266,12 +277,14 @@ public class SearchRateTrigger extends TriggerBase { public static class SearchRateEvent extends TriggerEvent { public SearchRateEvent(String source, long eventTime, Map hotNodes, Map hotCollections, - Map> hotShards, List hotReplicas) { + Map> hotShards, List hotReplicas, + Map warmShards) { super(TriggerEventType.SEARCHRATE, source, eventTime, null); properties.put(AutoScalingParams.COLLECTION, hotCollections); properties.put(AutoScalingParams.SHARD, hotShards); properties.put(AutoScalingParams.REPLICA, hotReplicas); properties.put(AutoScalingParams.NODE, hotNodes); + properties.put(AutoScalingParams.WARM_SHARD, warmShards); } } } \ No newline at end of file diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java index b0bf4e67545..93a13b056e0 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java @@ -38,6 +38,7 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.util.LogLevel; @@ -48,6 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA; /** @@ -115,6 +117,12 @@ public class ComputePlanActionTest extends SolrCloudTestCase { // expected if testNodeWithMultipleReplicasLost hasn't run already } + try { + CollectionAdminRequest.deleteCollection("testSearchRate").process(solrClient); + } catch (Exception e) { + // expected if testSearchRate hasn't run already + } + String setClusterPolicyCommand = "{" + " 'set-cluster-policy': [" + " {'cores':'<10', 'node':'#ANY'}," + @@ -343,6 +351,51 @@ public class ComputePlanActionTest extends SolrCloudTestCase { assertEquals("Unexpected node in computed operation", runner.getNodeName(), nodeAdded); } + @Test + public void testSearchRate() throws Exception { + // create an empty node + cluster.startJettySolrRunner(); + CloudSolrClient solrClient = cluster.getSolrClient(); + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : 'search_rate_trigger'," + + "'event' : 'searchRate'," + + "'waitFor' : '1s'," + + "'enabled' : true," + + "'rate' : 1.0," + + "'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," + + "{'name':'test','class':'" + ComputePlanActionTest.AssertingTriggerAction.class.getName() + "'}]" + + "}}"; + SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand); + NamedList response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("testSearchRate", + "conf",1, 1); + create.process(solrClient); + + waitForState("Timed out waiting for replicas of new collection to be active", + "testSearchRate", (liveNodes, collectionState) -> collectionState.getReplicas().stream().allMatch(replica -> replica.isActive(liveNodes))); + + // generate some dummy traffic + SolrParams query = params(CommonParams.Q, "*:*"); + for (int i = 0; i < 500; i++) { + solrClient.query("testSearchRate", query); + } + + assertTrue("Trigger was not fired", triggerFiredLatch.await(10, TimeUnit.SECONDS)); + assertTrue(fired.get()); + Map context = actionContextPropsRef.get(); + assertNotNull(context); + List operations = (List) context.get("operations"); + assertNotNull("The operations computed by ComputePlanAction should not be null", operations); + assertEquals("ComputePlanAction should have computed exactly 1 operation", 1, operations.size()); + SolrRequest request = operations.get(0); + SolrParams params = request.getParams(); + assertEquals("Expected ADDREPLICA action after exceeding searchRate", ADDREPLICA, CollectionParams.CollectionAction.get(params.get("action"))); + + } + public static class AssertingTriggerAction implements TriggerAction { @Override diff --git a/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java b/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java index f0c3d779bed..b78e23ef584 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java @@ -41,6 +41,7 @@ public interface AutoScalingParams { String SHARD = "shard"; String REPLICA = "replica"; String NODE = "node"; + String WARM_SHARD = "warmShard"; String HANDLER = "handler"; String RATE = "rate"; String REMOVE_LISTENERS = "removeListeners";