mirror of https://github.com/apache/lucene.git
SOLR-11072: Fix creation of searchRate triggers via API, add a unit test, other minor edits.
This commit is contained in:
parent
154edb8e3c
commit
6008b186a2
|
@ -153,6 +153,8 @@ public class AutoScaling {
|
|||
return new NodeAddedTrigger(name, props, loader, dataProvider);
|
||||
case NODELOST:
|
||||
return new NodeLostTrigger(name, props, loader, dataProvider);
|
||||
case SEARCHRATE:
|
||||
return new SearchRateTrigger(name, props, loader, dataProvider);
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -365,6 +366,8 @@ public class ScheduledTriggers implements Closeable {
|
|||
if (stateManager.hasData(statePath)) {
|
||||
stateManager.removeData(statePath, -1);
|
||||
}
|
||||
} catch (NoSuchElementException e) {
|
||||
// already removed by someone else
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to remove state for removed trigger " + statePath, e);
|
||||
}
|
||||
|
@ -378,6 +381,8 @@ public class ScheduledTriggers implements Closeable {
|
|||
ops.add(Op.delete(eventsPath, -1));
|
||||
stateManager.multi(ops);
|
||||
}
|
||||
} catch (NoSuchElementException e) {
|
||||
// already removed by someone else
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to remove events for removed trigger " + eventsPath, e);
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
|
@ -72,7 +73,7 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
collection = (String)properties.getOrDefault(AutoScalingParams.COLLECTION, Policy.ANY);
|
||||
shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY);
|
||||
if (collection.equals(Policy.ANY) && !shard.equals(Policy.ANY)) {
|
||||
throw new IllegalArgumentException("When 'shard' is other than #ANY collection name must be also other than #ANY");
|
||||
throw new IllegalArgumentException("When 'shard' is other than #ANY then collection name must be also other than #ANY");
|
||||
}
|
||||
node = (String)properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY);
|
||||
handler = (String)properties.getOrDefault(AutoScalingParams.HANDLER, "/select");
|
||||
|
@ -231,7 +232,36 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
|
||||
// generate event
|
||||
|
||||
if (processor.process(new SearchRateEvent(getName(), now, hotNodes, hotCollections, hotShards, hotReplicas))) {
|
||||
// find the earliest time when a condition was exceeded
|
||||
final AtomicLong eventTime = new AtomicLong(now);
|
||||
hotCollections.forEach((c, r) -> {
|
||||
long time = lastCollectionEvent.get(c);
|
||||
if (eventTime.get() > time) {
|
||||
eventTime.set(time);
|
||||
}
|
||||
});
|
||||
hotShards.forEach((c, shards) -> {
|
||||
shards.forEach((s, r) -> {
|
||||
long time = lastShardEvent.get(c + "." + s);
|
||||
if (eventTime.get() > time) {
|
||||
eventTime.set(time);
|
||||
}
|
||||
});
|
||||
});
|
||||
hotReplicas.forEach(r -> {
|
||||
long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore());
|
||||
if (eventTime.get() > time) {
|
||||
eventTime.set(time);
|
||||
}
|
||||
});
|
||||
hotNodes.forEach((n, r) -> {
|
||||
long time = lastNodeEvent.get(n);
|
||||
if (eventTime.get() > time) {
|
||||
eventTime.set(time);
|
||||
}
|
||||
});
|
||||
|
||||
if (processor.process(new SearchRateEvent(getName(), eventTime.get(), hotNodes, hotCollections, hotShards, hotReplicas))) {
|
||||
// update lastEvent times
|
||||
hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
|
||||
hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
|
||||
|
@ -244,7 +274,7 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
|
||||
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
|
||||
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
|
||||
log.info("name=" + name + ", lastTime=" + lastTime + ", elapsed=" + elapsed);
|
||||
log.debug("name=" + name + ", lastTime=" + lastTime + ", elapsed=" + elapsed);
|
||||
if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -32,8 +32,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
|
@ -45,6 +47,8 @@ import org.apache.solr.cloud.SolrCloudTestCase;
|
|||
import org.apache.solr.common.cloud.LiveNodesListener;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
|
@ -583,11 +587,11 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
if (triggerFired.compareAndSet(false, true)) {
|
||||
events.add(event);
|
||||
if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - event.getEventTime(), TimeUnit.NANOSECONDS) <= TimeUnit.MILLISECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
fail("NodeAddedListener was fired before the configured waitFor period");
|
||||
fail(event.source + " was fired before the configured waitFor period");
|
||||
}
|
||||
getTriggerFiredLatch().countDown();
|
||||
} else {
|
||||
fail("NodeAddedTrigger was fired more than once!");
|
||||
fail(event.source + " was fired more than once!");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.debug("--throwable", t);
|
||||
|
@ -1209,4 +1213,100 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
// must be larger than cooldown period
|
||||
assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.MILLISECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_MS));
|
||||
}
|
||||
|
||||
public static class TestSearchRateAction extends TriggerActionBase {
|
||||
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||
try {
|
||||
events.add(event);
|
||||
if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - event.getEventTime(), TimeUnit.NANOSECONDS) <= TimeUnit.MILLISECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
fail(event.source + " was fired before the configured waitFor period");
|
||||
}
|
||||
getTriggerFiredLatch().countDown();
|
||||
} catch (Throwable t) {
|
||||
log.debug("--throwable", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSearchRate() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String COLL1 = "collection1";
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
|
||||
"conf", 1, 2);
|
||||
create.process(solrClient);
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger'," +
|
||||
"'event' : 'searchRate'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'rate' : 1.0," +
|
||||
"'actions' : [" +
|
||||
"{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
String setListenerCommand1 = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'foo'," +
|
||||
"'trigger' : 'search_rate_trigger'," +
|
||||
"'stage' : ['FAILED','SUCCEEDED', 'IGNORED']," +
|
||||
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
SolrParams query = params(CommonParams.Q, "*:*");
|
||||
for (int i = 0; i < 500; i++) {
|
||||
solrClient.query(COLL1, query);
|
||||
}
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
Thread.sleep(2000);
|
||||
assertEquals(listenerEvents.toString(), 1, listenerEvents.get("foo").size());
|
||||
TestEvent ev = listenerEvents.get("foo").get(0);
|
||||
long now = timeSource.getTime();
|
||||
// verify waitFor
|
||||
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) < now - ev.event.getEventTime());
|
||||
Map<String, Double> nodeRates = (Map<String, Double>)ev.event.getProperties().get("node");
|
||||
assertNotNull("nodeRates", nodeRates);
|
||||
assertTrue(nodeRates.toString(), nodeRates.size() > 0);
|
||||
AtomicDouble totalNodeRate = new AtomicDouble();
|
||||
nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
|
||||
List<ReplicaInfo> replicaRates = (List<ReplicaInfo>)ev.event.getProperties().get("replica");
|
||||
assertNotNull("replicaRates", replicaRates);
|
||||
assertTrue(replicaRates.toString(), replicaRates.size() > 0);
|
||||
AtomicDouble totalReplicaRate = new AtomicDouble();
|
||||
replicaRates.forEach(r -> {
|
||||
assertTrue(r.toString(), r.getVariable("rate") != null);
|
||||
totalReplicaRate.addAndGet((Double)r.getVariable("rate"));
|
||||
});
|
||||
Map<String, Object> shardRates = (Map<String, Object>)ev.event.getProperties().get("shard");
|
||||
assertNotNull("shardRates", shardRates);
|
||||
assertEquals(shardRates.toString(), 1, shardRates.size());
|
||||
shardRates = (Map<String, Object>)shardRates.get(COLL1);
|
||||
assertNotNull("shardRates", shardRates);
|
||||
assertEquals(shardRates.toString(), 1, shardRates.size());
|
||||
AtomicDouble totalShardRate = new AtomicDouble();
|
||||
shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double)r));
|
||||
Map<String, Double> collectionRates = (Map<String, Double>)ev.event.getProperties().get("collection");
|
||||
assertNotNull("collectionRates", collectionRates);
|
||||
assertEquals(collectionRates.toString(), 1, collectionRates.size());
|
||||
Double collectionRate = collectionRates.get(COLL1);
|
||||
assertNotNull(collectionRate);
|
||||
assertTrue(collectionRate > 5.0);
|
||||
assertEquals(collectionRate, totalNodeRate.get(), 5.0);
|
||||
assertEquals(collectionRate, totalShardRate.get(), 5.0);
|
||||
assertEquals(collectionRate, totalReplicaRate.get(), 5.0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -135,6 +135,11 @@ public class AutoScalingConfig implements MapWriter {
|
|||
if (!afterActions.equals(that.afterActions)) return false;
|
||||
return properties.equals(that.properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -232,6 +237,11 @@ public class AutoScalingConfig implements MapWriter {
|
|||
ew.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -278,6 +288,11 @@ public class AutoScalingConfig implements MapWriter {
|
|||
|
||||
return properties.equals(that.properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue