From 6008b186a247a7860223df877e7c7965a68a18a7 Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Thu, 2 Nov 2017 16:29:48 +0100 Subject: [PATCH] SOLR-11072: Fix creation of searchRate triggers via API, add a unit test, other minor edits. --- .../solr/cloud/autoscaling/AutoScaling.java | 2 + .../cloud/autoscaling/ScheduledTriggers.java | 5 + .../cloud/autoscaling/SearchRateTrigger.java | 36 +++++- .../autoscaling/TriggerIntegrationTest.java | 104 +++++++++++++++++- .../cloud/autoscaling/AutoScalingConfig.java | 15 +++ 5 files changed, 157 insertions(+), 5 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java index 98f4927d579..e61536bd5f0 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java @@ -153,6 +153,8 @@ public class AutoScaling { return new NodeAddedTrigger(name, props, loader, dataProvider); case NODELOST: return new NodeLostTrigger(name, props, loader, dataProvider); + case SEARCHRATE: + return new SearchRateTrigger(name, props, loader, dataProvider); default: throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java index 908a9610d76..163183eda1d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -365,6 +366,8 @@ public class ScheduledTriggers implements Closeable { if (stateManager.hasData(statePath)) { stateManager.removeData(statePath, -1); } + } catch (NoSuchElementException e) { + // already removed by someone else } catch (Exception e) { log.warn("Failed to remove state for removed trigger " + statePath, e); } @@ -378,6 +381,8 @@ public class ScheduledTriggers implements Closeable { ops.add(Op.delete(eventsPath, -1)); stateManager.multi(ops); } + } catch (NoSuchElementException e) { + // already removed by someone else } catch (Exception e) { log.warn("Failed to remove events for removed trigger " + eventsPath, e); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java index a4dffa38aa3..0c6ffd49913 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import com.google.common.util.concurrent.AtomicDouble; @@ -72,7 +73,7 @@ public class SearchRateTrigger extends TriggerBase { collection = (String)properties.getOrDefault(AutoScalingParams.COLLECTION, Policy.ANY); shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY); if (collection.equals(Policy.ANY) && !shard.equals(Policy.ANY)) { - throw new IllegalArgumentException("When 'shard' is other than #ANY collection name must be also other than #ANY"); + throw new IllegalArgumentException("When 'shard' is other than #ANY then collection name must be also other than #ANY"); } node = (String)properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY); handler = (String)properties.getOrDefault(AutoScalingParams.HANDLER, "/select"); @@ -231,7 +232,36 @@ public class SearchRateTrigger extends TriggerBase { // generate event - if (processor.process(new SearchRateEvent(getName(), now, hotNodes, hotCollections, hotShards, hotReplicas))) { + // find the earliest time when a condition was exceeded + final AtomicLong eventTime = new AtomicLong(now); + hotCollections.forEach((c, r) -> { + long time = lastCollectionEvent.get(c); + if (eventTime.get() > time) { + eventTime.set(time); + } + }); + hotShards.forEach((c, shards) -> { + shards.forEach((s, r) -> { + long time = lastShardEvent.get(c + "." + s); + if (eventTime.get() > time) { + eventTime.set(time); + } + }); + }); + hotReplicas.forEach(r -> { + long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore()); + if (eventTime.get() > time) { + eventTime.set(time); + } + }); + hotNodes.forEach((n, r) -> { + long time = lastNodeEvent.get(n); + if (eventTime.get() > time) { + eventTime.set(time); + } + }); + + if (processor.process(new SearchRateEvent(getName(), eventTime.get(), hotNodes, hotCollections, hotShards, hotReplicas))) { // update lastEvent times hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now)); hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now)); @@ -244,7 +274,7 @@ public class SearchRateTrigger extends TriggerBase { private boolean waitForElapsed(String name, long now, Map lastEventMap) { Long lastTime = lastEventMap.computeIfAbsent(name, s -> now); long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS); - log.info("name=" + name + ", lastTime=" + lastTime + ", elapsed=" + elapsed); + log.debug("name=" + name + ", lastTime=" + lastTime + ", elapsed=" + elapsed); if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) { return false; } diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java index ef49879c06f..591f8c3d780 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java @@ -32,8 +32,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.util.concurrent.AtomicDouble; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; +import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; @@ -45,6 +47,8 @@ import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.common.cloud.LiveNodesListener; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.Utils; import org.apache.solr.util.LogLevel; @@ -583,11 +587,11 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { if (triggerFired.compareAndSet(false, true)) { events.add(event); if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - event.getEventTime(), TimeUnit.NANOSECONDS) <= TimeUnit.MILLISECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { - fail("NodeAddedListener was fired before the configured waitFor period"); + fail(event.source + " was fired before the configured waitFor period"); } getTriggerFiredLatch().countDown(); } else { - fail("NodeAddedTrigger was fired more than once!"); + fail(event.source + " was fired more than once!"); } } catch (Throwable t) { log.debug("--throwable", t); @@ -1209,4 +1213,100 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { // must be larger than cooldown period assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.MILLISECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_MS)); } + + public static class TestSearchRateAction extends TriggerActionBase { + + @Override + public void process(TriggerEvent event, ActionContext context) throws Exception { + try { + events.add(event); + if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - event.getEventTime(), TimeUnit.NANOSECONDS) <= TimeUnit.MILLISECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { + fail(event.source + " was fired before the configured waitFor period"); + } + getTriggerFiredLatch().countDown(); + } catch (Throwable t) { + log.debug("--throwable", t); + throw t; + } + } + } + + @Test + public void testSearchRate() throws Exception { + CloudSolrClient solrClient = cluster.getSolrClient(); + String COLL1 = "collection1"; + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1, + "conf", 1, 2); + create.process(solrClient); + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : 'search_rate_trigger'," + + "'event' : 'searchRate'," + + "'waitFor' : '" + waitForSeconds + "s'," + + "'enabled' : true," + + "'rate' : 1.0," + + "'actions' : [" + + "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" + + "]" + + "}}"; + SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand); + NamedList response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + String setListenerCommand1 = "{" + + "'set-listener' : " + + "{" + + "'name' : 'foo'," + + "'trigger' : 'search_rate_trigger'," + + "'stage' : ['FAILED','SUCCEEDED', 'IGNORED']," + + "'class' : '" + TestTriggerListener.class.getName() + "'" + + "}" + + "}"; + req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1); + response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + SolrParams query = params(CommonParams.Q, "*:*"); + for (int i = 0; i < 500; i++) { + solrClient.query(COLL1, query); + } + boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS); + assertTrue("The trigger did not fire at all", await); + // wait for listener to capture the SUCCEEDED stage + Thread.sleep(2000); + assertEquals(listenerEvents.toString(), 1, listenerEvents.get("foo").size()); + TestEvent ev = listenerEvents.get("foo").get(0); + long now = timeSource.getTime(); + // verify waitFor + assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) < now - ev.event.getEventTime()); + Map nodeRates = (Map)ev.event.getProperties().get("node"); + assertNotNull("nodeRates", nodeRates); + assertTrue(nodeRates.toString(), nodeRates.size() > 0); + AtomicDouble totalNodeRate = new AtomicDouble(); + nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r)); + List replicaRates = (List)ev.event.getProperties().get("replica"); + assertNotNull("replicaRates", replicaRates); + assertTrue(replicaRates.toString(), replicaRates.size() > 0); + AtomicDouble totalReplicaRate = new AtomicDouble(); + replicaRates.forEach(r -> { + assertTrue(r.toString(), r.getVariable("rate") != null); + totalReplicaRate.addAndGet((Double)r.getVariable("rate")); + }); + Map shardRates = (Map)ev.event.getProperties().get("shard"); + assertNotNull("shardRates", shardRates); + assertEquals(shardRates.toString(), 1, shardRates.size()); + shardRates = (Map)shardRates.get(COLL1); + assertNotNull("shardRates", shardRates); + assertEquals(shardRates.toString(), 1, shardRates.size()); + AtomicDouble totalShardRate = new AtomicDouble(); + shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double)r)); + Map collectionRates = (Map)ev.event.getProperties().get("collection"); + assertNotNull("collectionRates", collectionRates); + assertEquals(collectionRates.toString(), 1, collectionRates.size()); + Double collectionRate = collectionRates.get(COLL1); + assertNotNull(collectionRate); + assertTrue(collectionRate > 5.0); + assertEquals(collectionRate, totalNodeRate.get(), 5.0); + assertEquals(collectionRate, totalShardRate.get(), 5.0); + assertEquals(collectionRate, totalReplicaRate.get(), 5.0); + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java index 5312c291eb4..f8ab422725d 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java @@ -135,6 +135,11 @@ public class AutoScalingConfig implements MapWriter { if (!afterActions.equals(that.afterActions)) return false; return properties.equals(that.properties); } + + @Override + public String toString() { + return Utils.toJSONString(this); + } } /** @@ -232,6 +237,11 @@ public class AutoScalingConfig implements MapWriter { ew.put(entry.getKey(), entry.getValue()); } } + + @Override + public String toString() { + return Utils.toJSONString(this); + } } /** @@ -278,6 +288,11 @@ public class AutoScalingConfig implements MapWriter { return properties.equals(that.properties); } + + @Override + public String toString() { + return Utils.toJSONString(this); + } } /**