mirror of https://github.com/apache/lucene.git
SOLR-11066: Implement a scheduled autoscaling trigger that runs on a fixed interval beginning with a given start time
This commit is contained in:
parent
79c2988547
commit
71fc9cd43d
|
@ -172,6 +172,9 @@ New Features
|
||||||
|
|
||||||
* SOLR-11597: Add contrib/ltr NeuralNetworkModel class. (Michael A. Alcorn, Yuki Yano, Christine Poerschke)
|
* SOLR-11597: Add contrib/ltr NeuralNetworkModel class. (Michael A. Alcorn, Yuki Yano, Christine Poerschke)
|
||||||
|
|
||||||
|
* SOLR-11066: Implement a scheduled autoscaling trigger that runs on a fixed interval beginning with a
|
||||||
|
given start time. (David Smiley, Gus Heck, ab, shalin)
|
||||||
|
|
||||||
Bug Fixes
|
Bug Fixes
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -157,6 +157,8 @@ public class AutoScaling {
|
||||||
return new SearchRateTrigger(name, props, loader, cloudManager);
|
return new SearchRateTrigger(name, props, loader, cloudManager);
|
||||||
case METRIC:
|
case METRIC:
|
||||||
return new MetricTrigger(name, props, loader, cloudManager);
|
return new MetricTrigger(name, props, loader, cloudManager);
|
||||||
|
case SCHEDULED:
|
||||||
|
return new ScheduledTrigger(name, props, loader, cloudManager);
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
|
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
|
||||||
}
|
}
|
||||||
|
|
|
@ -208,6 +208,11 @@ public class ComputePlanAction extends TriggerActionBase {
|
||||||
event.getProperties().put(START, start);
|
event.getProperties().put(START, start);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case SCHEDULED:
|
||||||
|
String preferredOp = (String) event.getProperty(AutoScalingParams.PREFERRED_OP);
|
||||||
|
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp);
|
||||||
|
suggester = session.getSuggester(action);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException("No support for events other than nodeAdded, nodeLost, searchRate and metric. Received: " + event.getEventType());
|
throw new UnsupportedOperationException("No support for events other than nodeAdded, nodeLost, searchRate and metric. Received: " + event.getEventType());
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,198 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.time.format.DateTimeFormatterBuilder;
|
||||||
|
import java.time.temporal.ChronoField;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TimeZone;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||||
|
import org.apache.solr.common.SolrException;
|
||||||
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
|
import org.apache.solr.core.SolrResourceLoader;
|
||||||
|
import org.apache.solr.util.DateMathParser;
|
||||||
|
import org.apache.solr.util.TimeZoneUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A trigger which creates {@link TriggerEventType#SCHEDULED} events as per the configured schedule
|
||||||
|
*/
|
||||||
|
public class ScheduledTrigger extends TriggerBase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static final String DEFAULT_GRACE_DURATION = "+15MINUTES";
|
||||||
|
static final String ACTUAL_EVENT_TIME = "actualEventTime";
|
||||||
|
|
||||||
|
private final String everyStr;
|
||||||
|
|
||||||
|
private final String graceDurationStr;
|
||||||
|
|
||||||
|
private final String preferredOp;
|
||||||
|
|
||||||
|
private final TimeZone timeZone;
|
||||||
|
|
||||||
|
private Instant lastRunAt;
|
||||||
|
|
||||||
|
public ScheduledTrigger(String name, Map<String, Object> properties,
|
||||||
|
SolrResourceLoader loader, SolrCloudManager cloudManager) {
|
||||||
|
super(TriggerEventType.SCHEDULED, name, properties, loader, cloudManager);
|
||||||
|
|
||||||
|
String timeZoneStr = (String) properties.get("timeZone");
|
||||||
|
this.timeZone = TimeZoneUtils.parseTimezone(timeZoneStr); // defaults to UTC
|
||||||
|
|
||||||
|
String startTimeStr = (String) properties.get("startTime");
|
||||||
|
this.everyStr = (String) properties.get("every");
|
||||||
|
this.graceDurationStr = (String) properties.getOrDefault("graceDuration", DEFAULT_GRACE_DURATION);
|
||||||
|
|
||||||
|
preferredOp = (String) properties.getOrDefault(PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
|
||||||
|
|
||||||
|
// attempt parsing to validate date math strings
|
||||||
|
Instant startTime = parseStartTime(startTimeStr, timeZoneStr);
|
||||||
|
DateMathParser.parseMath(null, startTime + everyStr, timeZone);
|
||||||
|
DateMathParser.parseMath(null, startTime + graceDurationStr, timeZone);
|
||||||
|
|
||||||
|
this.lastRunAt = startTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Instant parseStartTime(String startTimeStr, String timeZoneStr) {
|
||||||
|
if (startTimeStr == null) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Parameter 'startTime' cannot be null");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// try parsing startTime as an ISO-8601 date time string
|
||||||
|
return DateMathParser.parseMath(null, startTimeStr).toInstant();
|
||||||
|
} catch (SolrException e) {
|
||||||
|
if (e.code() != SolrException.ErrorCode.BAD_REQUEST.code) throw e;
|
||||||
|
}
|
||||||
|
if (timeZoneStr == null) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||||
|
"Either 'startTime' should be an ISO-8601 date time string or 'timeZone' must be not be null");
|
||||||
|
}
|
||||||
|
DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
|
||||||
|
.append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("['T'[HH[:mm[:ss]]]]")
|
||||||
|
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
|
||||||
|
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
|
||||||
|
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
|
||||||
|
.toFormatter(Locale.ROOT).withZone(ZoneId.of(timeZoneStr));
|
||||||
|
return Instant.from(dateTimeFormatter.parse(startTimeStr));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Map<String, Object> getState() {
|
||||||
|
return Collections.singletonMap("lastRunAt", lastRunAt.toEpochMilli());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setState(Map<String, Object> state) {
|
||||||
|
if (state.containsKey("lastRunAt")) {
|
||||||
|
this.lastRunAt = Instant.ofEpochMilli((Long) state.get("lastRunAt"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void restoreState(AutoScaling.Trigger old) {
|
||||||
|
assert old.isClosed();
|
||||||
|
if (old instanceof ScheduledTrigger) {
|
||||||
|
ScheduledTrigger scheduledTrigger = (ScheduledTrigger) old;
|
||||||
|
this.lastRunAt = scheduledTrigger.lastRunAt;
|
||||||
|
} else {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
|
||||||
|
"Unable to restore state from an unknown type of trigger");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
synchronized (this) {
|
||||||
|
if (isClosed) {
|
||||||
|
log.warn("ScheduledTrigger ran but was already closed");
|
||||||
|
throw new RuntimeException("Trigger has been closed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
DateMathParser dateMathParser = new DateMathParser(timeZone);
|
||||||
|
dateMathParser.setNow(new Date(lastRunAt.toEpochMilli()));
|
||||||
|
Instant nextRunTime, nextPlusGrace;
|
||||||
|
try {
|
||||||
|
Date next = dateMathParser.parseMath(everyStr);
|
||||||
|
dateMathParser.setNow(next);
|
||||||
|
nextPlusGrace = dateMathParser.parseMath(graceDurationStr).toInstant();
|
||||||
|
nextRunTime = next.toInstant();
|
||||||
|
} catch (ParseException e) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||||
|
"Unable to calculate next run time. lastRan: " + lastRunAt.toString() + " and date math string: " + everyStr, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
Instant now = Instant.now(); // todo how to play well with simulation framework?
|
||||||
|
AutoScaling.TriggerEventProcessor processor = processorRef.get();
|
||||||
|
|
||||||
|
if (now.isBefore(nextRunTime)) {
|
||||||
|
return; // it's not time yet
|
||||||
|
}
|
||||||
|
if (now.isAfter(nextPlusGrace)) {
|
||||||
|
// we are past time and we could not run per schedule so skip this event
|
||||||
|
if (log.isWarnEnabled()) {
|
||||||
|
log.warn("ScheduledTrigger was not able to run event at scheduled time: {}. Now: {}",
|
||||||
|
nextRunTime, now);
|
||||||
|
}
|
||||||
|
if (processor.process(new ScheduledEvent(getEventType(), getName(), nextRunTime.toEpochMilli(),
|
||||||
|
preferredOp, now.toEpochMilli(), true))) {
|
||||||
|
lastRunAt = nextRunTime;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (processor != null) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("ScheduledTrigger {} firing registered processor for scheduled time {}, now={}", name,
|
||||||
|
nextRunTime, now);
|
||||||
|
}
|
||||||
|
if (processor.process(new ScheduledEvent(getEventType(), getName(), nextRunTime.toEpochMilli(),
|
||||||
|
preferredOp, now.toEpochMilli()))) {
|
||||||
|
lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ScheduledEvent extends TriggerEvent {
|
||||||
|
public ScheduledEvent(TriggerEventType eventType, String source, long eventTime, String preferredOp, long actualEventTime) {
|
||||||
|
this(eventType, source, eventTime, preferredOp, actualEventTime, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScheduledEvent(TriggerEventType eventType, String source, long eventTime, String preferredOp, long actualEventTime, boolean ignored) {
|
||||||
|
super(eventType, source, eventTime, null, ignored);
|
||||||
|
properties.put(PREFERRED_OP, preferredOp);
|
||||||
|
properties.put(ACTUAL_EVENT_TIME, actualEventTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -248,6 +248,12 @@ public class ScheduledTriggers implements Closeable {
|
||||||
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
|
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (event.isIgnored()) {
|
||||||
|
log.debug("-------- Ignoring event: " + event);
|
||||||
|
event.getProperties().put(TriggerEvent.IGNORED, true);
|
||||||
|
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Event was ignored.");
|
||||||
|
return true; // always return true for ignored events
|
||||||
|
}
|
||||||
// even though we pause all triggers during action execution there is a possibility that a trigger was already
|
// even though we pause all triggers during action execution there is a possibility that a trigger was already
|
||||||
// running at the time and would have already created an event so we reject such events during cooldown period
|
// running at the time and would have already created an event so we reject such events during cooldown period
|
||||||
if (cooldownStart.get() + cooldownPeriod.get() > cloudManager.getTimeSource().getTime()) {
|
if (cooldownStart.get() + cooldownPeriod.get() > cloudManager.getTimeSource().getTime()) {
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.solr.util.IdUtils;
|
||||||
* Trigger event.
|
* Trigger event.
|
||||||
*/
|
*/
|
||||||
public class TriggerEvent implements MapWriter {
|
public class TriggerEvent implements MapWriter {
|
||||||
|
public static final String IGNORED = "ignored";
|
||||||
public static final String COOLDOWN = "cooldown";
|
public static final String COOLDOWN = "cooldown";
|
||||||
public static final String REPLAYING = "replaying";
|
public static final String REPLAYING = "replaying";
|
||||||
public static final String NODE_NAMES = "nodeNames";
|
public static final String NODE_NAMES = "nodeNames";
|
||||||
|
@ -77,14 +78,25 @@ public class TriggerEvent implements MapWriter {
|
||||||
protected final long eventTime;
|
protected final long eventTime;
|
||||||
protected final TriggerEventType eventType;
|
protected final TriggerEventType eventType;
|
||||||
protected final Map<String, Object> properties = new HashMap<>();
|
protected final Map<String, Object> properties = new HashMap<>();
|
||||||
|
protected final boolean ignored;
|
||||||
|
|
||||||
public TriggerEvent(TriggerEventType eventType, String source, long eventTime,
|
public TriggerEvent(TriggerEventType eventType, String source, long eventTime,
|
||||||
Map<String, Object> properties) {
|
Map<String, Object> properties) {
|
||||||
this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties);
|
this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TriggerEvent(TriggerEventType eventType, String source, long eventTime,
|
||||||
|
Map<String, Object> properties, boolean ignored) {
|
||||||
|
this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties, ignored);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TriggerEvent(String id, TriggerEventType eventType, String source, long eventTime,
|
public TriggerEvent(String id, TriggerEventType eventType, String source, long eventTime,
|
||||||
Map<String, Object> properties) {
|
Map<String, Object> properties) {
|
||||||
|
this(id, eventType, source, eventTime, properties, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TriggerEvent(String id, TriggerEventType eventType, String source, long eventTime,
|
||||||
|
Map<String, Object> properties, boolean ignored) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.eventType = eventType;
|
this.eventType = eventType;
|
||||||
this.source = source;
|
this.source = source;
|
||||||
|
@ -92,6 +104,7 @@ public class TriggerEvent implements MapWriter {
|
||||||
if (properties != null) {
|
if (properties != null) {
|
||||||
this.properties.putAll(properties);
|
this.properties.putAll(properties);
|
||||||
}
|
}
|
||||||
|
this.ignored = ignored;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -150,6 +163,10 @@ public class TriggerEvent implements MapWriter {
|
||||||
return eventType;
|
return eventType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isIgnored() {
|
||||||
|
return ignored;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set event properties.
|
* Set event properties.
|
||||||
*
|
*
|
||||||
|
@ -169,6 +186,9 @@ public class TriggerEvent implements MapWriter {
|
||||||
ew.put("eventTime", eventTime);
|
ew.put("eventTime", eventTime);
|
||||||
ew.put("eventType", eventType.toString());
|
ew.put("eventType", eventType.toString());
|
||||||
ew.put("properties", properties);
|
ew.put("properties", properties);
|
||||||
|
if (ignored) {
|
||||||
|
ew.put("ignored", true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -182,6 +202,7 @@ public class TriggerEvent implements MapWriter {
|
||||||
if (!id.equals(that.id)) return false;
|
if (!id.equals(that.id)) return false;
|
||||||
if (!source.equals(that.source)) return false;
|
if (!source.equals(that.source)) return false;
|
||||||
if (eventType != that.eventType) return false;
|
if (eventType != that.eventType) return false;
|
||||||
|
if (ignored != that.ignored) return false;
|
||||||
return properties.equals(that.properties);
|
return properties.equals(that.properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,6 +213,7 @@ public class TriggerEvent implements MapWriter {
|
||||||
result = 31 * result + (int) (eventTime ^ (eventTime >>> 32));
|
result = 31 * result + (int) (eventTime ^ (eventTime >>> 32));
|
||||||
result = 31 * result + eventType.hashCode();
|
result = 31 * result + eventType.hashCode();
|
||||||
result = 31 * result + properties.hashCode();
|
result = 31 * result + properties.hashCode();
|
||||||
|
result = 31 * result + Boolean.hashCode(ignored);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,138 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.time.format.DateTimeFormatterBuilder;
|
||||||
|
import java.time.temporal.ChronoField;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TimeZone;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.core.CoreContainer;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for {@link ScheduledTrigger}
|
||||||
|
*/
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
|
||||||
|
public class ScheduledTriggerTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
|
private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
|
||||||
|
fail("Did not expect the listener to fire on first run!");
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(1)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrigger() throws Exception {
|
||||||
|
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||||
|
|
||||||
|
Map<String, Object> properties = createTriggerProperties(new Date().toInstant().toString(), TimeZone.getDefault().getID());
|
||||||
|
|
||||||
|
scheduledTriggerTest(container, properties);
|
||||||
|
|
||||||
|
TimeZone timeZone = TimeZone.getDefault();
|
||||||
|
DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
|
||||||
|
.append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("['T'[HH[:mm[:ss]]]]") //brackets mean optional
|
||||||
|
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
|
||||||
|
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
|
||||||
|
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
|
||||||
|
.toFormatter(Locale.ROOT).withZone(timeZone.toZoneId());
|
||||||
|
properties = createTriggerProperties(dateTimeFormatter.format(Instant.now()), timeZone.getID());
|
||||||
|
scheduledTriggerTest(container, properties);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testIgnoredEvent() throws Exception {
|
||||||
|
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||||
|
long threeDaysAgo = new Date().getTime() - TimeUnit.DAYS.toMillis(3);
|
||||||
|
Map<String, Object> properties = createTriggerProperties(new Date(threeDaysAgo).toInstant().toString(),
|
||||||
|
TimeZone.getDefault().getID(),
|
||||||
|
"+2DAYS", "+1HOUR");
|
||||||
|
try (ScheduledTrigger scheduledTrigger = new ScheduledTrigger("sched1", properties,
|
||||||
|
container.getResourceLoader(), container.getZkController().getSolrCloudManager())) {
|
||||||
|
scheduledTrigger.init();
|
||||||
|
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
|
||||||
|
scheduledTrigger.setProcessor(event -> {
|
||||||
|
eventRef.set(event);
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
scheduledTrigger.run();
|
||||||
|
assertTrue(eventRef.get().isIgnored());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void scheduledTriggerTest(CoreContainer container, Map<String, Object> properties) throws IOException, InterruptedException {
|
||||||
|
try (ScheduledTrigger scheduledTrigger = new ScheduledTrigger("sched1", properties,
|
||||||
|
container.getResourceLoader(), container.getZkController().getSolrCloudManager())) {
|
||||||
|
scheduledTrigger.init();
|
||||||
|
scheduledTrigger.setProcessor(noFirstRunProcessor);
|
||||||
|
scheduledTrigger.run();
|
||||||
|
final List<Long> eventTimes = new ArrayList<>();
|
||||||
|
scheduledTrigger.setProcessor(event -> {
|
||||||
|
eventTimes.add(event.getEventTime());
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
Thread.sleep(3000);
|
||||||
|
scheduledTrigger.run();
|
||||||
|
}
|
||||||
|
assertEquals(3, eventTimes.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> createTriggerProperties(String startTime, String timeZone) {
|
||||||
|
return createTriggerProperties(startTime, timeZone, "+3SECOND", "+2SECOND");
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> createTriggerProperties(String startTime, String timeZone, String every, String graceTime) {
|
||||||
|
Map<String, Object> properties = new HashMap<>();
|
||||||
|
properties.put("graceTime", graceTime);
|
||||||
|
properties.put("startTime", startTime);
|
||||||
|
properties.put("timeZone", timeZone);
|
||||||
|
properties.put("every", every);
|
||||||
|
List<Map<String, String>> actions = new ArrayList<>(3);
|
||||||
|
Map<String, String> map = new HashMap<>(2);
|
||||||
|
map.put("name", "compute_plan");
|
||||||
|
map.put("class", "solr.ComputePlanAction");
|
||||||
|
actions.add(map);
|
||||||
|
map = new HashMap<>(2);
|
||||||
|
map.put("name", "execute_plan");
|
||||||
|
map.put("class", "solr.ExecutePlanAction");
|
||||||
|
actions.add(map);
|
||||||
|
properties.put("actions", actions);
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -31,6 +32,7 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.AtomicDouble;
|
import com.google.common.util.concurrent.AtomicDouble;
|
||||||
|
@ -1633,4 +1635,66 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||||
assertEquals(5, docCollection.getReplicas().size());
|
assertEquals(5, docCollection.getReplicas().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testScheduledTrigger() throws Exception {
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
|
||||||
|
// this collection will place 2 cores on 1st node and 1 core on 2nd node
|
||||||
|
String collectionName = "testScheduledTrigger";
|
||||||
|
CollectionAdminRequest.createCollection(collectionName, 1, 3)
|
||||||
|
.setMaxShardsPerNode(5).process(solrClient);
|
||||||
|
waitForState("", collectionName, clusterShape(1, 3));
|
||||||
|
|
||||||
|
// create a policy which allows only 1 core per node thereby creating a violation for the above collection
|
||||||
|
String setClusterPolicy = "{\n" +
|
||||||
|
" \"set-cluster-policy\" : [\n" +
|
||||||
|
" {\"cores\" : \"<2\", \"node\" : \"#EACH\"}\n" +
|
||||||
|
" ]\n" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicy);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// start a new node which can be used to balance the cluster as per policy
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
cluster.waitForAllNodes(10);
|
||||||
|
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'sched_trigger_integration1'," +
|
||||||
|
"'event' : 'scheduled'," +
|
||||||
|
"'startTime' : '" + new Date().toInstant().toString() + "'" +
|
||||||
|
"'every' : '+3SECONDS'" +
|
||||||
|
"'actions' : [" +
|
||||||
|
"{'name' : 'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name' : 'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name' : 'recorder', 'class': '" + ContextPropertiesRecorderAction.class.getName() + "'}" +
|
||||||
|
"]}}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
assertTrue("ScheduledTrigger did not fire within 20 seconds", triggerFiredLatch.await(20, TimeUnit.SECONDS));
|
||||||
|
assertEquals(1, events.size());
|
||||||
|
Map<String, Object> actionContextProps = actionContextPropertiesRef.get();
|
||||||
|
assertNotNull(actionContextProps);
|
||||||
|
TriggerEvent event = events.iterator().next();
|
||||||
|
List<SolrRequest> operations = (List<SolrRequest>) actionContextProps.get("operations");
|
||||||
|
assertNotNull(operations);
|
||||||
|
assertEquals(1, operations.size());
|
||||||
|
for (SolrRequest operation : operations) {
|
||||||
|
SolrParams params = operation.getParams();
|
||||||
|
assertEquals(newNode.getNodeName(), params.get("targetNode"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static AtomicReference<Map<String, Object>> actionContextPropertiesRef = new AtomicReference<>();
|
||||||
|
|
||||||
|
public static class ContextPropertiesRecorderAction extends TestEventMarkerAction {
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
|
actionContextPropertiesRef.set(actionContext.getProperties());
|
||||||
|
super.process(event, actionContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
// specific language governing permissions and limitations
|
// specific language governing permissions and limitations
|
||||||
// under the License.
|
// under the License.
|
||||||
|
|
||||||
Triggers are used in autoscaling to watch for cluster events such as nodes joining or leaving.
|
Triggers are used in autoscaling to watch for cluster events such as nodes joining, leaving, search rate or any other metric breaching a threshold.
|
||||||
|
|
||||||
In the future other cluster, node, and replica events that are important from the
|
In the future other cluster, node, and replica events that are important from the
|
||||||
point of view of cluster performance will also have available triggers.
|
point of view of cluster performance will also have available triggers.
|
||||||
|
@ -27,7 +27,7 @@ change that merits attention they generate events, which are then queued and pro
|
||||||
resources (e.g., move replicas). Solr provides predefined implementations of triggers for specific event types.
|
resources (e.g., move replicas). Solr provides predefined implementations of triggers for specific event types.
|
||||||
|
|
||||||
Triggers execute on the node that runs `Overseer`. They are scheduled to run periodically,
|
Triggers execute on the node that runs `Overseer`. They are scheduled to run periodically,
|
||||||
currently at fixed interval of 1 second between each execution (not every execution produces events).
|
currently at default interval of 1 second between each execution (not every execution produces events).
|
||||||
|
|
||||||
== Event Types
|
== Event Types
|
||||||
Currently the following event types (and corresponding trigger implementations) are defined:
|
Currently the following event types (and corresponding trigger implementations) are defined:
|
||||||
|
@ -36,6 +36,7 @@ Currently the following event types (and corresponding trigger implementations)
|
||||||
* `nodeLost`: generated when a node leaves the cluster
|
* `nodeLost`: generated when a node leaves the cluster
|
||||||
* `metric`: generated when the configured metric crosses a configured lower or upper threshold value
|
* `metric`: generated when the configured metric crosses a configured lower or upper threshold value
|
||||||
* `searchRate`: generated when the 1-minute average search rate exceeds configured upper threshold
|
* `searchRate`: generated when the 1-minute average search rate exceeds configured upper threshold
|
||||||
|
* `scheduled` - generated according to a scheduled time period such as every 24 hours etc
|
||||||
|
|
||||||
Events are not necessarily generated immediately after the corresponding state change occurred - the
|
Events are not necessarily generated immediately after the corresponding state change occurred - the
|
||||||
maximum rate of events is controlled by the `waitFor` configuration parameter (see below).
|
maximum rate of events is controlled by the `waitFor` configuration parameter (see below).
|
||||||
|
@ -152,6 +153,23 @@ the threshold rate and the current request rate.
|
||||||
}
|
}
|
||||||
----
|
----
|
||||||
|
|
||||||
|
== Scheduled trigger
|
||||||
|
|
||||||
|
The Scheduled trigger generates events according to a fixed rate schedule.
|
||||||
|
|
||||||
|
The trigger supports the following configuration:
|
||||||
|
|
||||||
|
* `startTime` - (string, required) the start date/time of the schedule. This should either be an ISO-8601 date time string (the same standard used during search and indexing in Solr, thus defaulting to UTC) or be specified with the `timeZone` parameter.
|
||||||
|
* `every` - (string, required) a positive Solr date math string which is added to the `startTime` or the last run time to arrive at the next scheduled time
|
||||||
|
* `graceTime` - (string, optional) a positive Solr date math string. This is the additional grace time over the scheduled time within which the trigger is allowed to generate an event.
|
||||||
|
* `timeZone` - (string, optional) a time zone string which is used for calculating the scheduled times
|
||||||
|
* `preferredOp` - (string, optional, defaults to `MOVEREPLICA`) the preferred operation to perform in response to an event generated by this trigger. The only supported values are `MOVEREPLICA` and `ADDREPLICA`.
|
||||||
|
|
||||||
|
This trigger applies the `every` date math expression on the `startTime` or the last event time to derive the next scheduled time and if current time is greater than next scheduled time but within `graceTime` then an event is generated.
|
||||||
|
|
||||||
|
Apart from the common event properties described in the Event Types section, the trigger adds an additional `actualEventTime` event property which has the actual event time as opposed to the scheduled time.
|
||||||
|
For example, if the scheduled time was `2018-01-31T15:30:00Z` and grace time was `+15MINUTES` then an event may be fired at `2018-01-31T15:45:00Z`. Such an event will have `eventTime` as `2018-01-31T15:30:00Z` i.e. the scheduled time but the `actualEventTime` property will have a value of `2018-01-31T15:45:00Z`.
|
||||||
|
|
||||||
== Trigger Configuration
|
== Trigger Configuration
|
||||||
Trigger configurations are managed using the Autoscaling Write API and the commands `set-trigger`, `remove-trigger`,
|
Trigger configurations are managed using the Autoscaling Write API and the commands `set-trigger`, `remove-trigger`,
|
||||||
`suspend-trigger`, and `resume-trigger`.
|
`suspend-trigger`, and `resume-trigger`.
|
||||||
|
|
|
@ -291,6 +291,13 @@ public class MiniSolrCloudCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for all Solr nodes to be live
|
||||||
|
*
|
||||||
|
* @param timeout number of seconds to wait before throwing an IllegalStateException
|
||||||
|
* @throws IOException if there was an error communicating with ZooKeeper
|
||||||
|
* @throws InterruptedException if the calling thread is interrupted during the wait operation
|
||||||
|
*/
|
||||||
public void waitForAllNodes(int timeout) throws IOException, InterruptedException {
|
public void waitForAllNodes(int timeout) throws IOException, InterruptedException {
|
||||||
waitForAllNodes(jettys.size(), timeout);
|
waitForAllNodes(jettys.size(), timeout);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue