mirror of https://github.com/apache/lucene.git
SOLR-11066: Implement a scheduled autoscaling trigger that runs on a fixed interval beginning with a given start time
This commit is contained in:
parent
82a9984071
commit
137e647f2c
|
@ -170,6 +170,9 @@ New Features
|
|||
|
||||
* SOLR-12006: Add a '*_t' and '*_t_sort' dynamic field for single valued text fields (Varun Thacker)
|
||||
|
||||
* SOLR-11066: Implement a scheduled autoscaling trigger that runs on a fixed interval beginning with a
|
||||
given start time. (David Smiley, Gus Heck, ab, shalin)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -157,6 +157,8 @@ public class AutoScaling {
|
|||
return new SearchRateTrigger(name, props, loader, cloudManager);
|
||||
case METRIC:
|
||||
return new MetricTrigger(name, props, loader, cloudManager);
|
||||
case SCHEDULED:
|
||||
return new ScheduledTrigger(name, props, loader, cloudManager);
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
|
||||
}
|
||||
|
|
|
@ -200,6 +200,11 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
event.getProperties().put(START, start);
|
||||
}
|
||||
break;
|
||||
case SCHEDULED:
|
||||
String preferredOp = (String) event.getProperty(AutoScalingParams.PREFERRED_OP);
|
||||
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp);
|
||||
suggester = session.getSuggester(action);
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("No support for events other than nodeAdded, nodeLost, searchRate and metric. Received: " + event.getEventType());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.text.ParseException;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.format.DateTimeFormatterBuilder;
|
||||
import java.time.temporal.ChronoField;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.util.DateMathParser;
|
||||
import org.apache.solr.util.TimeZoneUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
|
||||
|
||||
/**
|
||||
* A trigger which creates {@link TriggerEventType#SCHEDULED} events as per the configured schedule
|
||||
*/
|
||||
public class ScheduledTrigger extends TriggerBase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final String DEFAULT_GRACE_DURATION = "+15MINUTES";
|
||||
static final String ACTUAL_EVENT_TIME = "actualEventTime";
|
||||
|
||||
private final String everyStr;
|
||||
|
||||
private final String graceDurationStr;
|
||||
|
||||
private final String preferredOp;
|
||||
|
||||
private final TimeZone timeZone;
|
||||
|
||||
private Instant lastRunAt;
|
||||
|
||||
public ScheduledTrigger(String name, Map<String, Object> properties,
|
||||
SolrResourceLoader loader, SolrCloudManager cloudManager) {
|
||||
super(TriggerEventType.SCHEDULED, name, properties, loader, cloudManager);
|
||||
|
||||
String timeZoneStr = (String) properties.get("timeZone");
|
||||
this.timeZone = TimeZoneUtils.parseTimezone(timeZoneStr); // defaults to UTC
|
||||
|
||||
String startTimeStr = (String) properties.get("startTime");
|
||||
this.everyStr = (String) properties.get("every");
|
||||
this.graceDurationStr = (String) properties.getOrDefault("graceDuration", DEFAULT_GRACE_DURATION);
|
||||
|
||||
preferredOp = (String) properties.getOrDefault(PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
|
||||
|
||||
// attempt parsing to validate date math strings
|
||||
Instant startTime = parseStartTime(startTimeStr, timeZoneStr);
|
||||
DateMathParser.parseMath(null, startTime + everyStr, timeZone);
|
||||
DateMathParser.parseMath(null, startTime + graceDurationStr, timeZone);
|
||||
|
||||
this.lastRunAt = startTime;
|
||||
}
|
||||
|
||||
private Instant parseStartTime(String startTimeStr, String timeZoneStr) {
|
||||
if (startTimeStr == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Parameter 'startTime' cannot be null");
|
||||
}
|
||||
try {
|
||||
// try parsing startTime as an ISO-8601 date time string
|
||||
return DateMathParser.parseMath(null, startTimeStr).toInstant();
|
||||
} catch (SolrException e) {
|
||||
if (e.code() != SolrException.ErrorCode.BAD_REQUEST.code) throw e;
|
||||
}
|
||||
if (timeZoneStr == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Either 'startTime' should be an ISO-8601 date time string or 'timeZone' must be not be null");
|
||||
}
|
||||
DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
|
||||
.append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("['T'[HH[:mm[:ss]]]]")
|
||||
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
|
||||
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
|
||||
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
|
||||
.toFormatter(Locale.ROOT).withZone(ZoneId.of(timeZoneStr));
|
||||
return Instant.from(dateTimeFormatter.parse(startTimeStr));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> getState() {
|
||||
return Collections.singletonMap("lastRunAt", lastRunAt.toEpochMilli());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setState(Map<String, Object> state) {
|
||||
if (state.containsKey("lastRunAt")) {
|
||||
this.lastRunAt = Instant.ofEpochMilli((Long) state.get("lastRunAt"));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreState(AutoScaling.Trigger old) {
|
||||
assert old.isClosed();
|
||||
if (old instanceof ScheduledTrigger) {
|
||||
ScheduledTrigger scheduledTrigger = (ScheduledTrigger) old;
|
||||
this.lastRunAt = scheduledTrigger.lastRunAt;
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
|
||||
"Unable to restore state from an unknown type of trigger");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (this) {
|
||||
if (isClosed) {
|
||||
log.warn("ScheduledTrigger ran but was already closed");
|
||||
throw new RuntimeException("Trigger has been closed");
|
||||
}
|
||||
}
|
||||
|
||||
DateMathParser dateMathParser = new DateMathParser(timeZone);
|
||||
dateMathParser.setNow(new Date(lastRunAt.toEpochMilli()));
|
||||
Instant nextRunTime, nextPlusGrace;
|
||||
try {
|
||||
Date next = dateMathParser.parseMath(everyStr);
|
||||
dateMathParser.setNow(next);
|
||||
nextPlusGrace = dateMathParser.parseMath(graceDurationStr).toInstant();
|
||||
nextRunTime = next.toInstant();
|
||||
} catch (ParseException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Unable to calculate next run time. lastRan: " + lastRunAt.toString() + " and date math string: " + everyStr, e);
|
||||
}
|
||||
|
||||
Instant now = Instant.now(); // todo how to play well with simulation framework?
|
||||
AutoScaling.TriggerEventProcessor processor = processorRef.get();
|
||||
|
||||
if (now.isBefore(nextRunTime)) {
|
||||
return; // it's not time yet
|
||||
}
|
||||
if (now.isAfter(nextPlusGrace)) {
|
||||
// we are past time and we could not run per schedule so skip this event
|
||||
if (log.isWarnEnabled()) {
|
||||
log.warn("ScheduledTrigger was not able to run event at scheduled time: {}. Now: {}",
|
||||
nextRunTime, now);
|
||||
}
|
||||
if (processor.process(new ScheduledEvent(getEventType(), getName(), nextRunTime.toEpochMilli(),
|
||||
preferredOp, now.toEpochMilli(), true))) {
|
||||
lastRunAt = nextRunTime;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (processor != null) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("ScheduledTrigger {} firing registered processor for scheduled time {}, now={}", name,
|
||||
nextRunTime, now);
|
||||
}
|
||||
if (processor.process(new ScheduledEvent(getEventType(), getName(), nextRunTime.toEpochMilli(),
|
||||
preferredOp, now.toEpochMilli()))) {
|
||||
lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
|
||||
}
|
||||
} else {
|
||||
lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
|
||||
}
|
||||
}
|
||||
|
||||
public static class ScheduledEvent extends TriggerEvent {
|
||||
public ScheduledEvent(TriggerEventType eventType, String source, long eventTime, String preferredOp, long actualEventTime) {
|
||||
this(eventType, source, eventTime, preferredOp, actualEventTime, false);
|
||||
}
|
||||
|
||||
public ScheduledEvent(TriggerEventType eventType, String source, long eventTime, String preferredOp, long actualEventTime, boolean ignored) {
|
||||
super(eventType, source, eventTime, null, ignored);
|
||||
properties.put(PREFERRED_OP, preferredOp);
|
||||
properties.put(ACTUAL_EVENT_TIME, actualEventTime);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -248,6 +248,12 @@ public class ScheduledTriggers implements Closeable {
|
|||
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
|
||||
return false;
|
||||
}
|
||||
if (event.isIgnored()) {
|
||||
log.debug("-------- Ignoring event: " + event);
|
||||
event.getProperties().put(TriggerEvent.IGNORED, true);
|
||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Event was ignored.");
|
||||
return true; // always return true for ignored events
|
||||
}
|
||||
// even though we pause all triggers during action execution there is a possibility that a trigger was already
|
||||
// running at the time and would have already created an event so we reject such events during cooldown period
|
||||
if (cooldownStart.get() + cooldownPeriod.get() > cloudManager.getTimeSource().getTime()) {
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.solr.util.IdUtils;
|
|||
* Trigger event.
|
||||
*/
|
||||
public class TriggerEvent implements MapWriter {
|
||||
public static final String IGNORED = "ignored";
|
||||
public static final String COOLDOWN = "cooldown";
|
||||
public static final String REPLAYING = "replaying";
|
||||
public static final String NODE_NAMES = "nodeNames";
|
||||
|
@ -77,14 +78,25 @@ public class TriggerEvent implements MapWriter {
|
|||
protected final long eventTime;
|
||||
protected final TriggerEventType eventType;
|
||||
protected final Map<String, Object> properties = new HashMap<>();
|
||||
protected final boolean ignored;
|
||||
|
||||
public TriggerEvent(TriggerEventType eventType, String source, long eventTime,
|
||||
Map<String, Object> properties) {
|
||||
this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties);
|
||||
this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties, false);
|
||||
}
|
||||
|
||||
public TriggerEvent(TriggerEventType eventType, String source, long eventTime,
|
||||
Map<String, Object> properties, boolean ignored) {
|
||||
this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties, ignored);
|
||||
}
|
||||
|
||||
public TriggerEvent(String id, TriggerEventType eventType, String source, long eventTime,
|
||||
Map<String, Object> properties) {
|
||||
this(id, eventType, source, eventTime, properties, false);
|
||||
}
|
||||
|
||||
public TriggerEvent(String id, TriggerEventType eventType, String source, long eventTime,
|
||||
Map<String, Object> properties, boolean ignored) {
|
||||
this.id = id;
|
||||
this.eventType = eventType;
|
||||
this.source = source;
|
||||
|
@ -92,6 +104,7 @@ public class TriggerEvent implements MapWriter {
|
|||
if (properties != null) {
|
||||
this.properties.putAll(properties);
|
||||
}
|
||||
this.ignored = ignored;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -150,6 +163,10 @@ public class TriggerEvent implements MapWriter {
|
|||
return eventType;
|
||||
}
|
||||
|
||||
public boolean isIgnored() {
|
||||
return ignored;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set event properties.
|
||||
*
|
||||
|
@ -169,6 +186,9 @@ public class TriggerEvent implements MapWriter {
|
|||
ew.put("eventTime", eventTime);
|
||||
ew.put("eventType", eventType.toString());
|
||||
ew.put("properties", properties);
|
||||
if (ignored) {
|
||||
ew.put("ignored", true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -182,6 +202,7 @@ public class TriggerEvent implements MapWriter {
|
|||
if (!id.equals(that.id)) return false;
|
||||
if (!source.equals(that.source)) return false;
|
||||
if (eventType != that.eventType) return false;
|
||||
if (ignored != that.ignored) return false;
|
||||
return properties.equals(that.properties);
|
||||
}
|
||||
|
||||
|
@ -192,6 +213,7 @@ public class TriggerEvent implements MapWriter {
|
|||
result = 31 * result + (int) (eventTime ^ (eventTime >>> 32));
|
||||
result = 31 * result + eventType.hashCode();
|
||||
result = 31 * result + properties.hashCode();
|
||||
result = 31 * result + Boolean.hashCode(ignored);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.format.DateTimeFormatterBuilder;
|
||||
import java.time.temporal.ChronoField;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test for {@link ScheduledTrigger}
|
||||
*/
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
|
||||
public class ScheduledTriggerTest extends SolrCloudTestCase {
|
||||
|
||||
private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
|
||||
fail("Did not expect the listener to fire on first run!");
|
||||
return true;
|
||||
};
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(1)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTrigger() throws Exception {
|
||||
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||
|
||||
Map<String, Object> properties = createTriggerProperties(new Date().toInstant().toString(), TimeZone.getDefault().getID());
|
||||
|
||||
scheduledTriggerTest(container, properties);
|
||||
|
||||
TimeZone timeZone = TimeZone.getDefault();
|
||||
DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
|
||||
.append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("['T'[HH[:mm[:ss]]]]") //brackets mean optional
|
||||
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
|
||||
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
|
||||
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
|
||||
.toFormatter(Locale.ROOT).withZone(timeZone.toZoneId());
|
||||
properties = createTriggerProperties(dateTimeFormatter.format(Instant.now()), timeZone.getID());
|
||||
scheduledTriggerTest(container, properties);
|
||||
}
|
||||
|
||||
public void testIgnoredEvent() throws Exception {
|
||||
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||
long now = System.currentTimeMillis();
|
||||
long threeDaysAgo = now - TimeUnit.DAYS.toMillis(3);
|
||||
Map<String, Object> properties = createTriggerProperties(new Date(threeDaysAgo).toInstant().toString(),
|
||||
TimeZone.getDefault().getID(),
|
||||
"+2DAYS", "+1HOUR");
|
||||
try (ScheduledTrigger scheduledTrigger = new ScheduledTrigger("sched1", properties,
|
||||
container.getResourceLoader(), container.getZkController().getSolrCloudManager())) {
|
||||
scheduledTrigger.init();
|
||||
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
|
||||
scheduledTrigger.setProcessor(event -> {
|
||||
eventRef.set(event);
|
||||
return true;
|
||||
});
|
||||
scheduledTrigger.run();
|
||||
assertTrue(eventRef.get().isIgnored());
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduledTriggerTest(CoreContainer container, Map<String, Object> properties) throws IOException, InterruptedException {
|
||||
try (ScheduledTrigger scheduledTrigger = new ScheduledTrigger("sched1", properties,
|
||||
container.getResourceLoader(), container.getZkController().getSolrCloudManager())) {
|
||||
scheduledTrigger.init();
|
||||
scheduledTrigger.setProcessor(noFirstRunProcessor);
|
||||
scheduledTrigger.run();
|
||||
final List<Long> eventTimes = new ArrayList<>();
|
||||
scheduledTrigger.setProcessor(event -> {
|
||||
eventTimes.add(event.getEventTime());
|
||||
return true;
|
||||
});
|
||||
for (int i = 0; i < 3; i++) {
|
||||
Thread.sleep(3000);
|
||||
scheduledTrigger.run();
|
||||
}
|
||||
assertEquals(3, eventTimes.size());
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Object> createTriggerProperties(String startTime, String timeZone) {
|
||||
return createTriggerProperties(startTime, timeZone, "+3SECOND", "+2SECOND");
|
||||
}
|
||||
|
||||
private Map<String, Object> createTriggerProperties(String startTime, String timeZone, String every, String graceTime) {
|
||||
Map<String, Object> properties = new HashMap<>();
|
||||
properties.put("graceTime", graceTime);
|
||||
properties.put("startTime", startTime);
|
||||
properties.put("timeZone", timeZone);
|
||||
properties.put("every", every);
|
||||
List<Map<String, String>> actions = new ArrayList<>(3);
|
||||
Map<String, String> map = new HashMap<>(2);
|
||||
map.put("name", "compute_plan");
|
||||
map.put("class", "solr.ComputePlanAction");
|
||||
actions.add(map);
|
||||
map = new HashMap<>(2);
|
||||
map.put("name", "execute_plan");
|
||||
map.put("class", "solr.ExecutePlanAction");
|
||||
actions.add(map);
|
||||
properties.put("actions", actions);
|
||||
return properties;
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling;
|
|||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -31,6 +32,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
|
@ -1630,4 +1632,64 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||
assertEquals(3, docCollection.getReplicas().size());
|
||||
}
|
||||
|
||||
public void testScheduledTrigger() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
|
||||
String collectionName = "testScheduledTrigger";
|
||||
CollectionAdminRequest.createCollection(collectionName, 2, 3)
|
||||
.setMaxShardsPerNode(5).process(solrClient);
|
||||
waitForState("", collectionName, clusterShape(2, 3));
|
||||
|
||||
String setClusterPolicy = "{\n" +
|
||||
" \"set-cluster-policy\" : [\n" +
|
||||
" {\"cores\" : \"<3\", \"node\" : \"#EACH\"}\n" +
|
||||
" ]\n" +
|
||||
"}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicy);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||
cluster.waitForAllNodes(10);
|
||||
|
||||
// first trigger
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'sched_trigger_integration1'," +
|
||||
"'event' : 'scheduled'," +
|
||||
"'startTime' : '" + new Date().toInstant().toString() + "'" +
|
||||
"'every' : '+3SECONDS'" +
|
||||
"'actions' : [" +
|
||||
"{'name' : 'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name' : 'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
"{'name' : 'recorder', 'class': '" + ContextPropertiesRecorderAction.class.getName() + "'}" +
|
||||
"]}}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
assertTrue("ScheduledTrigger did not fire within 20 seconds", triggerFiredLatch.await(600, TimeUnit.SECONDS));
|
||||
assertEquals(1, events.size());
|
||||
Map<String, Object> actionContextProps = actionContextPropertiesRef.get();
|
||||
assertNotNull(actionContextProps);
|
||||
TriggerEvent event = events.iterator().next();
|
||||
List<SolrRequest> operations = (List<SolrRequest>) actionContextProps.get("operations");
|
||||
assertNotNull(operations);
|
||||
assertEquals(2, operations.size());
|
||||
for (SolrRequest operation : operations) {
|
||||
SolrParams params = operation.getParams();
|
||||
assertEquals(newNode.getNodeName(), params.get("targetNode"));
|
||||
}
|
||||
}
|
||||
|
||||
private static AtomicReference<Map<String, Object>> actionContextPropertiesRef = new AtomicReference<>();
|
||||
|
||||
public static class ContextPropertiesRecorderAction extends TestEventMarkerAction {
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||
actionContextPropertiesRef.set(actionContext.getProperties());
|
||||
super.process(event, actionContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
Triggers are used in autoscaling to watch for cluster events such as nodes joining or leaving.
|
||||
Triggers are used in autoscaling to watch for cluster events such as nodes joining, leaving, search rate or any other metric breaching a threshold.
|
||||
|
||||
In the future other cluster, node, and replica events that are important from the
|
||||
point of view of cluster performance will also have available triggers.
|
||||
|
@ -27,7 +27,7 @@ change that merits attention they generate events, which are then queued and pro
|
|||
resources (e.g., move replicas). Solr provides predefined implementations of triggers for specific event types.
|
||||
|
||||
Triggers execute on the node that runs `Overseer`. They are scheduled to run periodically,
|
||||
currently at fixed interval of 1 second between each execution (not every execution produces events).
|
||||
currently at default interval of 1 second between each execution (not every execution produces events).
|
||||
|
||||
== Event Types
|
||||
Currently the following event types (and corresponding trigger implementations) are defined:
|
||||
|
@ -36,6 +36,7 @@ Currently the following event types (and corresponding trigger implementations)
|
|||
* `nodeLost` - generated when a node leaves the cluster
|
||||
* `metric` - generated when the configured metric crosses a configured lower or upper threshold value
|
||||
* `searchRate` - generated when the 1 min average search rate exceeds configured upper threshold
|
||||
* `scheduled` - generated according to a scheduled time period such as every 24 hours etc
|
||||
|
||||
Events are not necessarily generated immediately after the corresponding state change occurred - the
|
||||
maximum rate of events is controlled by the `waitFor` configuration parameter (see below).
|
||||
|
@ -138,6 +139,23 @@ the threshold rate and the current request rate.
|
|||
}
|
||||
----
|
||||
|
||||
== Scheduled trigger
|
||||
|
||||
The Scheduled trigger generates events according to a fixed rate schedule.
|
||||
|
||||
The trigger supports the following configuration:
|
||||
|
||||
* `startTime` - (string, required) the start date/time of the schedule. This should either be an ISO-8601 date time string (the same standard used during search and indexing in Solr, thus defaulting to UTC) or be specified with the `timeZone` parameter.
|
||||
* `every` - (string, required) a positive Solr date math string which is added to the `startTime` or the last run time to arrive at the next scheduled time
|
||||
* `graceTime` - (string, optional) a positive Solr date math string. This is the additional grace time over the scheduled time within which the trigger is allowed to generate an event.
|
||||
* `timeZone` - (string, optional) a time zone string which is used for calculating the scheduled times
|
||||
* `preferredOp` - (string, optional, defaults to `MOVEREPLICA`) the preferred operation to perform in response to an event generated by this trigger. The only supported values are `MOVEREPLICA` and `ADDREPLICA`.
|
||||
|
||||
This trigger applies the `every` date math expression on the `startTime` or the last event time to derive the next scheduled time and if current time is greater than next scheduled time but within `graceTime` then an event is generated.
|
||||
|
||||
Apart from the common event properties described in the Event Types section, the trigger adds an additional `actualEventTime` event property which has the actual event time as opposed to the scheduled time.
|
||||
For example, if the scheduled time was `2018-01-31T15:30:00Z` and grace time was `+15MINUTES` then an event may be fired at `2018-01-31T15:45:00Z`. Such an event will have `eventTime` as `2018-01-31T15:30:00Z` i.e. the scheduled time but the `actualEventTime` property will have a value of `2018-01-31T15:45:00Z`.
|
||||
|
||||
== Trigger Configuration
|
||||
Trigger configurations are managed using the Autoscaling Write API and the commands `set-trigger`, `remove-trigger`,
|
||||
`suspend-trigger`, and `resume-trigger`.
|
||||
|
|
|
@ -291,6 +291,13 @@ public class MiniSolrCloudCluster {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for all Solr nodes to be live
|
||||
*
|
||||
* @param timeout number of seconds to wait before throwing an IllegalStateException
|
||||
* @throws IOException if there was an error communicating with ZooKeeper
|
||||
* @throws InterruptedException if the calling thread is interrupted during the wait operation
|
||||
*/
|
||||
public void waitForAllNodes(int timeout) throws IOException, InterruptedException {
|
||||
waitForAllNodes(jettys.size(), timeout);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue