From cda1d2ee1a326240aa4f9ae9636cf3f927a82f1a Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Mon, 18 Sep 2017 12:21:44 -0700 Subject: [PATCH] Revert "SOLR-11072: Implement trigger for searchRate event type." --- .../cloud/autoscaling/SearchRateTrigger.java | 277 ------------------ .../autoscaling/SearchRateTriggerTest.java | 167 ----------- 2 files changed, 444 deletions(-) delete mode 100644 solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java delete mode 100644 solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java deleted file mode 100644 index 6692fda1220..00000000000 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.solr.cloud.autoscaling; - -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import com.google.common.util.concurrent.AtomicDouble; -import org.apache.solr.client.solrj.cloud.autoscaling.Policy; -import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; -import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; -import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.client.solrj.impl.SolrClientDataProvider; -import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider; -import org.apache.solr.common.SolrException; -import org.apache.solr.common.params.AutoScalingParams; -import org.apache.solr.core.CoreContainer; -import org.apache.solr.metrics.SolrCoreMetricManager; -import org.apache.solr.util.TimeSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Trigger for the {@link org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType#SEARCHRATE} event. - */ -public class SearchRateTrigger extends TriggerBase { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private final TimeSource timeSource; - private final CoreContainer container; - private final String handler; - private final String collection; - private final String shard; - private final String node; - private final double rate; - private final Map lastCollectionEvent = new ConcurrentHashMap<>(); - private final Map lastNodeEvent = new ConcurrentHashMap<>(); - private final Map lastShardEvent = new ConcurrentHashMap<>(); - private final Map lastReplicaEvent = new ConcurrentHashMap<>(); - private final Map state = new HashMap<>(); - - public SearchRateTrigger(String name, Map properties, CoreContainer container) { - super(TriggerEventType.SEARCHRATE, name, properties, container.getResourceLoader(), container.getZkController().getZkClient()); - this.timeSource = TimeSource.CURRENT_TIME; - this.container = container; - this.state.put("lastCollectionEvent", lastCollectionEvent); - this.state.put("lastNodeEvent", lastNodeEvent); - this.state.put("lastShardEvent", lastShardEvent); - this.state.put("lastReplicaEvent", lastReplicaEvent); - - // parse config options - collection = (String)properties.getOrDefault(AutoScalingParams.COLLECTION, Policy.ANY); - shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY); - if (collection.equals(Policy.ANY) && !shard.equals(Policy.ANY)) { - throw new IllegalArgumentException("When 'shard' is other than #ANY collection name must be also other than #ANY"); - } - node = (String)properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY); - handler = (String)properties.getOrDefault(AutoScalingParams.HANDLER, "/select"); - - if (properties.get("rate") == null) { - throw new IllegalArgumentException("No 'rate' specified in configuration"); - } - String rateString = String.valueOf(properties.get("rate")); - try { - rate = Double.parseDouble(rateString); - } catch (Exception e) { - throw new IllegalArgumentException("Invalid 'rate' configuration value: '" + rateString + "'", e); - } - } - - @Override - protected Map getState() { - return state; - } - - @Override - protected void setState(Map state) { - lastCollectionEvent.clear(); - lastNodeEvent.clear(); - lastShardEvent.clear(); - lastReplicaEvent.clear(); - Map collTimes = (Map)state.get("lastCollectionEvent"); - if (collTimes != null) { - lastCollectionEvent.putAll(collTimes); - } - Map nodeTimes = (Map)state.get("lastNodeEvent"); - if (nodeTimes != null) { - lastNodeEvent.putAll(nodeTimes); - } - Map shardTimes = (Map)state.get("lastShardEvent"); - if (shardTimes != null) { - lastShardEvent.putAll(shardTimes); - } - Map replicaTimes = (Map)state.get("lastReplicaEvent"); - if (replicaTimes != null) { - lastReplicaEvent.putAll(replicaTimes); - } - } - - @Override - public void restoreState(AutoScaling.Trigger old) { - assert old.isClosed(); - if (old instanceof SearchRateTrigger) { - SearchRateTrigger that = (SearchRateTrigger)old; - assert this.name.equals(that.name); - this.lastCollectionEvent.clear(); - this.lastNodeEvent.clear(); - this.lastShardEvent.clear(); - this.lastReplicaEvent.clear(); - this.lastCollectionEvent.putAll(that.lastCollectionEvent); - this.lastNodeEvent.putAll(that.lastNodeEvent); - this.lastShardEvent.putAll(that.lastShardEvent); - this.lastReplicaEvent.putAll(that.lastReplicaEvent); - } else { - throw new SolrException(SolrException.ErrorCode.INVALID_STATE, - "Unable to restore state from an unknown type of trigger"); - } - - } - - @Override - public void run() { - AutoScaling.TriggerEventProcessor processor = processorRef.get(); - if (processor == null) { - return; - } - - Map>> collectionRates = new HashMap<>(); - Map nodeRates = new HashMap<>(); - - try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder() - .withClusterStateProvider(new ZkClientClusterStateProvider(container.getZkController().getZkStateReader())) - .build()) { - - SolrClientDataProvider dataProvider = new SolrClientDataProvider(cloudSolrClient); - - for (String node : dataProvider.getNodes()) { - Map metricTags = new HashMap<>(); - // coll, shard, replica - Map>> infos = dataProvider.getReplicaInfo(node, Collections.emptyList()); - infos.forEach((coll, shards) -> { - shards.forEach((sh, replicas) -> { - replicas.forEach(replica -> { - // we have to translate to the metrics registry name, which uses "_replica_nN" as suffix - String replicaName = SolrCoreMetricManager.parseReplicaName(coll, replica.getCore()); - if (replicaName == null) { // should never happen??? - replicaName = replica.getName(); // which is actually coreNode name... - } - String registry = SolrCoreMetricManager.createRegistryName(true, coll, sh, replicaName, null); - String tag = "metrics:" + registry - + ":QUERY." + handler + ".requestTimes:1minRate"; - metricTags.put(tag, replica); - }); - }); - }); - Map rates = dataProvider.getNodeValues(node, metricTags.keySet()); - rates.forEach((tag, rate) -> { - ReplicaInfo info = metricTags.get(tag); - if (info == null) { - log.warn("Missing replica info for response tag " + tag); - } else { - Map> perCollection = collectionRates.computeIfAbsent(info.getCollection(), s -> new HashMap<>()); - List perShard = perCollection.computeIfAbsent(info.getShard(), s -> new ArrayList<>()); - info.getVariables().put(AutoScalingParams.RATE, rate); - perShard.add(info); - AtomicDouble perNode = nodeRates.computeIfAbsent(node, s -> new AtomicDouble()); - perNode.addAndGet((Double)rate); - } - }); - } - } catch (IOException e) { - log.warn("Exception getting node values", e); - return; - } - - long now = timeSource.getTime(); - // check for exceeded rates and filter out those with less than waitFor from previous events - Map hotNodes = nodeRates.entrySet().stream() - .filter(entry -> node.equals(Policy.ANY) || node.equals(entry.getKey())) - .filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent)) - .filter(entry -> entry.getValue().get() > rate) - .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get())); - - Map> hotShards = new HashMap<>(); - List hotReplicas = new ArrayList<>(); - collectionRates.forEach((coll, shardRates) -> { - shardRates.forEach((sh, replicaRates) -> { - double shardRate = replicaRates.stream() - .map(r -> { - if (waitForElapsed(r.getCollection() + "." + r.getCore(), now, lastReplicaEvent) && - ((Double)r.getVariable(AutoScalingParams.RATE) > rate)) { - hotReplicas.add(r); - } - return r; - }) - .mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum(); - if (waitForElapsed(coll + "." + sh, now, lastShardEvent) && - (shardRate > rate) && - (collection.equals(Policy.ANY) || collection.equals(coll)) && - (shard.equals(Policy.ANY) || shard.equals(sh))) { - hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate); - } - }); - }); - - Map hotCollections = new HashMap<>(); - collectionRates.forEach((coll, shardRates) -> { - double total = shardRates.entrySet().stream() - .mapToDouble(e -> e.getValue().stream() - .mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum(); - if (waitForElapsed(coll, now, lastCollectionEvent) && - (total > rate) && - (collection.equals(Policy.ANY) || collection.equals(coll))) { - hotCollections.put(coll, total); - } - }); - - if (hotCollections.isEmpty() && hotShards.isEmpty() && hotReplicas.isEmpty() && hotNodes.isEmpty()) { - return; - } - - // generate event - - if (processor.process(new SearchRateEvent(getName(), now, hotNodes, hotCollections, hotShards, hotReplicas))) { - // update lastEvent times - hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now)); - hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now)); - hotShards.entrySet().forEach(e -> e.getValue() - .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now))); - hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now)); - } - } - - private boolean waitForElapsed(String name, long now, Map lastEventMap) { - Long lastTime = lastEventMap.computeIfAbsent(name, s -> now); - long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS); - log.info("name=" + name + ", lastTime=" + lastTime + ", elapsed=" + elapsed); - if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) { - return false; - } - return true; - } - - public static class SearchRateEvent extends TriggerEvent { - public SearchRateEvent(String source, long eventTime, Map hotNodes, - Map hotCollections, - Map> hotShards, List hotReplicas) { - super(TriggerEventType.SEARCHRATE, source, eventTime, null); - properties.put(AutoScalingParams.COLLECTION, hotCollections); - properties.put(AutoScalingParams.SHARD, hotShards); - properties.put(AutoScalingParams.REPLICA, hotReplicas); - properties.put(AutoScalingParams.NODE, hotNodes); - } - } -} \ No newline at end of file diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java deleted file mode 100644 index 0719c2278b6..00000000000 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java +++ /dev/null @@ -1,167 +0,0 @@ -package org.apache.solr.cloud.autoscaling; - -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; -import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; -import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.client.solrj.impl.HttpSolrClient; -import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.cloud.SolrCloudTestCase; -import org.apache.solr.common.params.AutoScalingParams; -import org.apache.solr.common.params.CommonParams; -import org.apache.solr.common.params.SolrParams; -import org.apache.solr.core.CoreContainer; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * - */ -public class SearchRateTriggerTest extends SolrCloudTestCase { - - private static final String PREFIX = SearchRateTriggerTest.class.getSimpleName() + "-"; - private static final String COLL1 = PREFIX + "collection1"; - private static final String COLL2 = PREFIX + "collection2"; - - private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> { - fail("Did not expect the listener to fire on first run!"); - return true; - }; - - @BeforeClass - public static void setupCluster() throws Exception { - configureCluster(4) - .addConfig("conf", configset("cloud-minimal")) - .configure(); - CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1, - "conf", 2, 2); - CloudSolrClient solrClient = cluster.getSolrClient(); - create.setMaxShardsPerNode(1); - create.process(solrClient); - create = CollectionAdminRequest.createCollection(COLL2, - "conf", 2, 2); - create.setMaxShardsPerNode(1); - create.process(solrClient); - } - - @Test - public void testTrigger() throws Exception { - double rate = 1.0; - CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer(); - URL baseUrl = cluster.getJettySolrRunners().get(1).getBaseUrl(); - long waitForSeconds = 5 + random().nextInt(5); - Map props = createTriggerProps(waitForSeconds, rate); - final List events = new ArrayList<>(); - CloudSolrClient solrClient = cluster.getSolrClient(); - - try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger", props, container)) { - trigger.setProcessor(noFirstRunProcessor); - trigger.run(); - trigger.setProcessor(event -> events.add(event)); - - // generate replica traffic - String coreName = container.getLoadedCoreNames().iterator().next(); - String url = baseUrl.toString() + "/" + coreName; - try (HttpSolrClient simpleClient = new HttpSolrClient.Builder(url).build()) { - SolrParams query = params(CommonParams.Q, "*:*", CommonParams.DISTRIB, "false"); - for (int i = 0; i < 200; i++) { - simpleClient.query(query); - } - trigger.run(); - // waitFor delay - assertEquals(0, events.size()); - Thread.sleep(waitForSeconds * 1000 + 2000); - // should generate replica event - trigger.run(); - assertEquals(1, events.size()); - TriggerEvent event = events.get(0); - assertEquals(TriggerEventType.SEARCHRATE, event.eventType); - List infos = (List)event.getProperty(AutoScalingParams.REPLICA); - assertEquals(1, infos.size()); - ReplicaInfo info = infos.get(0); - assertEquals(coreName, info.getCore()); - assertTrue((Double)info.getVariable(AutoScalingParams.RATE) > rate); - } - // close that jetty to remove the violation - alternatively wait for 1 min... - cluster.stopJettySolrRunner(1); - events.clear(); - SolrParams query = params(CommonParams.Q, "*:*"); - for (int i = 0; i < 500; i++) { - solrClient.query(COLL1, query); - } - Thread.sleep(waitForSeconds * 1000 + 2000); - trigger.run(); - // should generate collection event - assertEquals(1, events.size()); - TriggerEvent event = events.get(0); - Map hotCollections = (Map)event.getProperty(AutoScalingParams.COLLECTION); - assertEquals(1, hotCollections.size()); - Double Rate = hotCollections.get(COLL1); - assertNotNull(Rate); - assertTrue(Rate > rate); - events.clear(); - - for (int i = 0; i < 1000; i++) { - solrClient.query(COLL2, query); - } - Thread.sleep(waitForSeconds * 1000 + 2000); - trigger.run(); - // should generate node and collection event but not for COLL2 because of waitFor - assertEquals(1, events.size()); - event = events.get(0); - Map hotNodes = (Map)event.getProperty(AutoScalingParams.NODE); - assertEquals(3, hotNodes.size()); - hotNodes.forEach((n, r) -> assertTrue(n, r > rate)); - hotCollections = (Map)event.getProperty(AutoScalingParams.COLLECTION); - assertEquals(2, hotCollections.size()); - Rate = hotCollections.get(COLL1); - assertNotNull(Rate); - Rate = hotCollections.get(COLL2); - assertNotNull(Rate); - - events.clear(); - // assert that waitFor prevents new events from being generated - trigger.run(); - // should not generate any events - assertEquals(0, events.size()); - - Thread.sleep(waitForSeconds * 1000 + 2000); - trigger.run(); - // should generate node and collection event - assertEquals(1, events.size()); - hotCollections = (Map)event.getProperty(AutoScalingParams.COLLECTION); - assertEquals(2, hotCollections.size()); - Rate = hotCollections.get(COLL1); - assertNotNull(Rate); - Rate = hotCollections.get(COLL2); - assertNotNull(Rate); - hotNodes = (Map)event.getProperty(AutoScalingParams.NODE); - assertEquals(3, hotNodes.size()); - hotNodes.forEach((n, r) -> assertTrue(n, r > rate)); - } - } - - private Map createTriggerProps(long waitForSeconds, double rate) { - Map props = new HashMap<>(); - props.put("rate", rate); - props.put("event", "searchRate"); - props.put("waitFor", waitForSeconds); - props.put("enabled", true); - List> actions = new ArrayList<>(3); - Map map = new HashMap<>(2); - map.put("name", "compute_plan"); - map.put("class", "solr.ComputePlanAction"); - actions.add(map); - map = new HashMap<>(2); - map.put("name", "execute_plan"); - map.put("class", "solr.ExecutePlanAction"); - actions.add(map); - props.put("actions", actions); - return props; - } -}