mirror of https://github.com/apache/lucene.git
SOLR-11670: Implement a periodic house-keeping task.
This commit is contained in:
parent
2bb6b98582
commit
b17052e852
|
@ -186,6 +186,9 @@ New Features
|
||||||
|
|
||||||
* SOLR-11960: Add collection level properties similar to cluster properties (Peter Rusko, Tomás Fernández Löbbe)
|
* SOLR-11960: Add collection level properties similar to cluster properties (Peter Rusko, Tomás Fernández Löbbe)
|
||||||
|
|
||||||
|
* SOLR-11670: Implement a periodic house-keeping task. This uses a scheduled autoscaling trigger and
|
||||||
|
currently performs cleanup of old inactive shards. (ab, shalin)
|
||||||
|
|
||||||
Bug Fixes
|
Bug Fixes
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -166,9 +166,11 @@ public class AutoScaling {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static final String AUTO_ADD_REPLICAS_TRIGGER_NAME = ".auto_add_replicas";
|
||||||
|
|
||||||
public static final String AUTO_ADD_REPLICAS_TRIGGER_DSL =
|
public static final String AUTO_ADD_REPLICAS_TRIGGER_DSL =
|
||||||
" {" +
|
" {" +
|
||||||
" 'name' : '.auto_add_replicas'," +
|
" 'name' : '" + AUTO_ADD_REPLICAS_TRIGGER_NAME + "'," +
|
||||||
" 'event' : 'nodeLost'," +
|
" 'event' : 'nodeLost'," +
|
||||||
" 'waitFor' : -1," +
|
" 'waitFor' : -1," +
|
||||||
" 'enabled' : true," +
|
" 'enabled' : true," +
|
||||||
|
@ -185,4 +187,27 @@ public class AutoScaling {
|
||||||
" }";
|
" }";
|
||||||
|
|
||||||
public static final Map<String, Object> AUTO_ADD_REPLICAS_TRIGGER_PROPS = (Map) Utils.fromJSONString(AUTO_ADD_REPLICAS_TRIGGER_DSL);
|
public static final Map<String, Object> AUTO_ADD_REPLICAS_TRIGGER_PROPS = (Map) Utils.fromJSONString(AUTO_ADD_REPLICAS_TRIGGER_DSL);
|
||||||
|
|
||||||
|
public static final String SCHEDULED_MAINTENANCE_TRIGGER_NAME = ".scheduled_maintenance";
|
||||||
|
|
||||||
|
public static final String SCHEDULED_MAINTENANCE_TRIGGER_DSL =
|
||||||
|
" {" +
|
||||||
|
" 'name' : '" + SCHEDULED_MAINTENANCE_TRIGGER_NAME + "'," +
|
||||||
|
" 'event' : 'scheduled'," +
|
||||||
|
" 'startTime' : 'NOW'," +
|
||||||
|
" 'every' : '+1DAY'," +
|
||||||
|
" 'enabled' : true," +
|
||||||
|
" 'actions' : [" +
|
||||||
|
" {" +
|
||||||
|
" 'name':'inactive_shard_plan'," +
|
||||||
|
" 'class':'solr.InactiveShardPlanAction'" +
|
||||||
|
" }," +
|
||||||
|
" {" +
|
||||||
|
" 'name':'execute_plan'," +
|
||||||
|
" 'class':'solr.ExecutePlanAction'" +
|
||||||
|
" }" +
|
||||||
|
" ]" +
|
||||||
|
" }";
|
||||||
|
|
||||||
|
public static final Map<String, Object> SCHEDULED_MAINTENANCE_TRIGGER_PROPS = (Map) Utils.fromJSONString(SCHEDULED_MAINTENANCE_TRIGGER_DSL);
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is responsible for executing cluster operations read from the {@link ActionContext}'s properties
|
* This class is responsible for executing cluster operations read from the {@link ActionContext}'s properties
|
||||||
* with the key name "operations"
|
* with the key name "operations".
|
||||||
*/
|
*/
|
||||||
public class ExecutePlanAction extends TriggerActionBase {
|
public class ExecutePlanAction extends TriggerActionBase {
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
|
import org.apache.solr.common.cloud.Slice;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class checks whether there are shards that have been inactive for a long
|
||||||
|
* time (which usually means they are left-overs from shard splitting) and requests their removal
|
||||||
|
* after their cleanup TTL period elapsed.
|
||||||
|
* <p>Shard delete requests are put into the {@link ActionContext}'s properties
|
||||||
|
* with the key name "operations". The value is a List of SolrRequest objects.</p>
|
||||||
|
*/
|
||||||
|
public class InactiveShardPlanAction extends TriggerActionBase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
public static final String TTL_PROP = "ttl";
|
||||||
|
|
||||||
|
public static final int DEFAULT_TTL_SECONDS = 3600 * 24 * 2;
|
||||||
|
|
||||||
|
private int cleanupTTL;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Map<String, String> args) {
|
||||||
|
super.init(args);
|
||||||
|
String cleanupStr = args.getOrDefault(TTL_PROP, String.valueOf(DEFAULT_TTL_SECONDS));
|
||||||
|
try {
|
||||||
|
cleanupTTL = Integer.parseInt(cleanupStr);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Invalid " + TTL_PROP + " value: '" + cleanupStr + "', using default " + DEFAULT_TTL_SECONDS);
|
||||||
|
cleanupTTL = DEFAULT_TTL_SECONDS;
|
||||||
|
}
|
||||||
|
if (cleanupTTL < 0) {
|
||||||
|
log.warn("Invalid " + TTL_PROP + " value: '" + cleanupStr + "', using default " + DEFAULT_TTL_SECONDS);
|
||||||
|
cleanupTTL = DEFAULT_TTL_SECONDS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||||
|
SolrCloudManager cloudManager = context.getCloudManager();
|
||||||
|
ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
|
||||||
|
Map<String, List<String>> cleanup = new LinkedHashMap<>();
|
||||||
|
Map<String, List<String>> inactive = new LinkedHashMap<>();
|
||||||
|
state.forEachCollection(coll ->
|
||||||
|
coll.getSlices().forEach(s -> {
|
||||||
|
if (Slice.State.INACTIVE.equals(s.getState())) {
|
||||||
|
inactive.computeIfAbsent(coll.getName(), c -> new ArrayList<>()).add(s.getName());
|
||||||
|
String tstampStr = s.getStr(ZkStateReader.STATE_TIMESTAMP_PROP);
|
||||||
|
if (tstampStr == null || tstampStr.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
long timestamp = Long.parseLong(tstampStr);
|
||||||
|
// this timestamp uses epoch time
|
||||||
|
long currentTime = cloudManager.getTimeSource().getEpochTime();
|
||||||
|
long delta = TimeUnit.NANOSECONDS.toSeconds(currentTime - timestamp);
|
||||||
|
log.debug("{}/{}: tstamp={}, time={}, delta={}", coll.getName(), s.getName(), timestamp, currentTime, delta);
|
||||||
|
if (delta > cleanupTTL) {
|
||||||
|
log.debug("-- delete inactive {} / {}", coll.getName(), s.getName());
|
||||||
|
List<SolrRequest> operations = (List<SolrRequest>)context.getProperties().computeIfAbsent("operations", k -> new ArrayList<>());
|
||||||
|
operations.add(CollectionAdminRequest.deleteShard(coll.getName(), s.getName()));
|
||||||
|
cleanup.computeIfAbsent(coll.getName(), c -> new ArrayList<>()).add(s.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
);
|
||||||
|
if (!cleanup.isEmpty()) {
|
||||||
|
Map<String, Object> results = new LinkedHashMap<>();
|
||||||
|
results.put("inactive", inactive);
|
||||||
|
results.put("cleanup", cleanup);
|
||||||
|
context.getProperties().put(getName(), results);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -121,13 +121,15 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
|
||||||
int lastZnodeVersion = znodeVersion;
|
int lastZnodeVersion = znodeVersion;
|
||||||
|
|
||||||
// we automatically add a trigger for auto add replicas if it does not exists already
|
// we automatically add a trigger for auto add replicas if it does not exists already
|
||||||
|
// we also automatically add a scheduled maintenance trigger
|
||||||
while (!isClosed) {
|
while (!isClosed) {
|
||||||
try {
|
try {
|
||||||
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
|
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
|
||||||
AutoScalingConfig withAutoAddReplicasTrigger = withAutoAddReplicasTrigger(autoScalingConfig);
|
AutoScalingConfig updatedConfig = withAutoAddReplicasTrigger(autoScalingConfig);
|
||||||
if (withAutoAddReplicasTrigger.equals(autoScalingConfig)) break;
|
updatedConfig = withScheduledMaintenanceTrigger(updatedConfig);
|
||||||
log.debug("Adding .autoAddReplicas trigger");
|
if (updatedConfig.equals(autoScalingConfig)) break;
|
||||||
cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(withAutoAddReplicasTrigger), withAutoAddReplicasTrigger.getZkVersion());
|
log.debug("Adding .auto_add_replicas and .scheduled_maintenance triggers");
|
||||||
|
cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(updatedConfig), updatedConfig.getZkVersion());
|
||||||
break;
|
break;
|
||||||
} catch (BadVersionException bve) {
|
} catch (BadVersionException bve) {
|
||||||
// somebody else has changed the configuration so we must retry
|
// somebody else has changed the configuration so we must retry
|
||||||
|
@ -338,6 +340,15 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
|
||||||
|
|
||||||
private AutoScalingConfig withAutoAddReplicasTrigger(AutoScalingConfig autoScalingConfig) {
|
private AutoScalingConfig withAutoAddReplicasTrigger(AutoScalingConfig autoScalingConfig) {
|
||||||
Map<String, Object> triggerProps = AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_PROPS;
|
Map<String, Object> triggerProps = AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_PROPS;
|
||||||
|
return withDefaultTrigger(triggerProps, autoScalingConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
private AutoScalingConfig withScheduledMaintenanceTrigger(AutoScalingConfig autoScalingConfig) {
|
||||||
|
Map<String, Object> triggerProps = AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_PROPS;
|
||||||
|
return withDefaultTrigger(triggerProps, autoScalingConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
private AutoScalingConfig withDefaultTrigger(Map<String, Object> triggerProps, AutoScalingConfig autoScalingConfig) {
|
||||||
String triggerName = (String) triggerProps.get("name");
|
String triggerName = (String) triggerProps.get("name");
|
||||||
Map<String, AutoScalingConfig.TriggerConfig> configs = autoScalingConfig.getTriggerConfigs();
|
Map<String, AutoScalingConfig.TriggerConfig> configs = autoScalingConfig.getTriggerConfigs();
|
||||||
for (AutoScalingConfig.TriggerConfig cfg : configs.values()) {
|
for (AutoScalingConfig.TriggerConfig cfg : configs.values()) {
|
||||||
|
|
|
@ -28,10 +28,12 @@ import java.util.Date;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
|
import org.apache.solr.common.util.TimeSource;
|
||||||
import org.apache.solr.core.SolrResourceLoader;
|
import org.apache.solr.core.SolrResourceLoader;
|
||||||
import org.apache.solr.util.DateMathParser;
|
import org.apache.solr.util.DateMathParser;
|
||||||
import org.apache.solr.util.TimeZoneUtils;
|
import org.apache.solr.util.TimeZoneUtils;
|
||||||
|
@ -143,6 +145,7 @@ public class ScheduledTrigger extends TriggerBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TimeSource timeSource = cloudManager.getTimeSource();
|
||||||
DateMathParser dateMathParser = new DateMathParser(timeZone);
|
DateMathParser dateMathParser = new DateMathParser(timeZone);
|
||||||
dateMathParser.setNow(new Date(lastRunAt.toEpochMilli()));
|
dateMathParser.setNow(new Date(lastRunAt.toEpochMilli()));
|
||||||
Instant nextRunTime, nextPlusGrace;
|
Instant nextRunTime, nextPlusGrace;
|
||||||
|
@ -156,7 +159,8 @@ public class ScheduledTrigger extends TriggerBase {
|
||||||
"Unable to calculate next run time. lastRan: " + lastRunAt.toString() + " and date math string: " + everyStr, e);
|
"Unable to calculate next run time. lastRan: " + lastRunAt.toString() + " and date math string: " + everyStr, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
Instant now = Instant.now(); // todo how to play well with simulation framework?
|
Instant now = Instant.ofEpochMilli(
|
||||||
|
TimeUnit.NANOSECONDS.toMillis(timeSource.getEpochTime()));
|
||||||
AutoScaling.TriggerEventProcessor processor = processorRef.get();
|
AutoScaling.TriggerEventProcessor processor = processorRef.get();
|
||||||
|
|
||||||
if (now.isBefore(nextRunTime)) {
|
if (now.isBefore(nextRunTime)) {
|
||||||
|
@ -182,7 +186,7 @@ public class ScheduledTrigger extends TriggerBase {
|
||||||
log.debug("ScheduledTrigger {} firing registered processor for scheduled time {}, now={}", name,
|
log.debug("ScheduledTrigger {} firing registered processor for scheduled time {}, now={}", name,
|
||||||
nextRunTime, now);
|
nextRunTime, now);
|
||||||
}
|
}
|
||||||
if (processor.process(new ScheduledEvent(getEventType(), getName(), nextRunTime.toEpochMilli(),
|
if (processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getTime(),
|
||||||
preferredOp, now.toEpochMilli()))) {
|
preferredOp, now.toEpochMilli()))) {
|
||||||
lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
|
lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,6 +148,10 @@ public class ScheduledTriggers implements Closeable {
|
||||||
if (this.autoScalingConfig != null) {
|
if (this.autoScalingConfig != null) {
|
||||||
currentProps.putAll(this.autoScalingConfig.getProperties());
|
currentProps.putAll(this.autoScalingConfig.getProperties());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reset listeners early in order to capture first execution of newly scheduled triggers
|
||||||
|
listeners.setAutoScalingConfig(autoScalingConfig);
|
||||||
|
|
||||||
for (Map.Entry<String, Object> entry : currentProps.entrySet()) {
|
for (Map.Entry<String, Object> entry : currentProps.entrySet()) {
|
||||||
Map<String, Object> newProps = autoScalingConfig.getProperties();
|
Map<String, Object> newProps = autoScalingConfig.getProperties();
|
||||||
String key = entry.getKey();
|
String key = entry.getKey();
|
||||||
|
@ -178,12 +182,10 @@ public class ScheduledTriggers implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.autoScalingConfig = autoScalingConfig;
|
|
||||||
|
|
||||||
|
this.autoScalingConfig = autoScalingConfig;
|
||||||
// reset cooldown
|
// reset cooldown
|
||||||
cooldownStart.set(cloudManager.getTimeSource().getTime() - cooldownPeriod.get());
|
cooldownStart.set(cloudManager.getTimeSource().getTime() - cooldownPeriod.get());
|
||||||
|
|
||||||
listeners.setAutoScalingConfig(autoScalingConfig);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -226,16 +228,17 @@ public class ScheduledTriggers implements Closeable {
|
||||||
scheduledTriggerWrappers.replace(newTrigger.getName(), triggerWrapper);
|
scheduledTriggerWrappers.replace(newTrigger.getName(), triggerWrapper);
|
||||||
}
|
}
|
||||||
newTrigger.setProcessor(event -> {
|
newTrigger.setProcessor(event -> {
|
||||||
|
TriggerListeners triggerListeners = listeners.copy();
|
||||||
if (cloudManager.isClosed()) {
|
if (cloudManager.isClosed()) {
|
||||||
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because Solr has been shutdown.", event.toString());
|
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because Solr has been shutdown.", event.toString());
|
||||||
log.warn(msg);
|
log.warn(msg);
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
|
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
TriggerWrapper scheduledSource = scheduledTriggerWrappers.get(event.getSource());
|
TriggerWrapper scheduledSource = scheduledTriggerWrappers.get(event.getSource());
|
||||||
if (scheduledSource == null) {
|
if (scheduledSource == null) {
|
||||||
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s doesn't exist.", event.toString(), event.getSource());
|
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s doesn't exist.", event.toString(), event.getSource());
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, msg);
|
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, msg);
|
||||||
log.warn(msg);
|
log.warn(msg);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -243,7 +246,7 @@ public class ScheduledTriggers implements Closeable {
|
||||||
AutoScaling.Trigger source = scheduledSource.trigger;
|
AutoScaling.Trigger source = scheduledSource.trigger;
|
||||||
if (scheduledSource.isClosed || source.isClosed()) {
|
if (scheduledSource.isClosed || source.isClosed()) {
|
||||||
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
|
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
|
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
|
||||||
log.warn(msg);
|
log.warn(msg);
|
||||||
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
|
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
|
||||||
return false;
|
return false;
|
||||||
|
@ -251,7 +254,7 @@ public class ScheduledTriggers implements Closeable {
|
||||||
if (event.isIgnored()) {
|
if (event.isIgnored()) {
|
||||||
log.debug("-------- Ignoring event: " + event);
|
log.debug("-------- Ignoring event: " + event);
|
||||||
event.getProperties().put(TriggerEvent.IGNORED, true);
|
event.getProperties().put(TriggerEvent.IGNORED, true);
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Event was ignored.");
|
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Event was ignored.");
|
||||||
return true; // always return true for ignored events
|
return true; // always return true for ignored events
|
||||||
}
|
}
|
||||||
// even though we pause all triggers during action execution there is a possibility that a trigger was already
|
// even though we pause all triggers during action execution there is a possibility that a trigger was already
|
||||||
|
@ -259,7 +262,7 @@ public class ScheduledTriggers implements Closeable {
|
||||||
if (cooldownStart.get() + cooldownPeriod.get() > cloudManager.getTimeSource().getTime()) {
|
if (cooldownStart.get() + cooldownPeriod.get() > cloudManager.getTimeSource().getTime()) {
|
||||||
log.debug("-------- Cooldown period - rejecting event: " + event);
|
log.debug("-------- Cooldown period - rejecting event: " + event);
|
||||||
event.getProperties().put(TriggerEvent.COOLDOWN, true);
|
event.getProperties().put(TriggerEvent.COOLDOWN, true);
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "In cooldown period.");
|
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "In cooldown period.");
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
log.debug("++++++++ Cooldown inactive - processing event: " + event);
|
log.debug("++++++++ Cooldown inactive - processing event: " + event);
|
||||||
|
@ -275,12 +278,12 @@ public class ScheduledTriggers implements Closeable {
|
||||||
enqueued = triggerWrapper.enqueue(event);
|
enqueued = triggerWrapper.enqueue(event);
|
||||||
}
|
}
|
||||||
// fire STARTED event listeners after enqueuing the event is successful
|
// fire STARTED event listeners after enqueuing the event is successful
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.STARTED);
|
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.STARTED);
|
||||||
List<TriggerAction> actions = source.getActions();
|
List<TriggerAction> actions = source.getActions();
|
||||||
if (actions != null) {
|
if (actions != null) {
|
||||||
if (actionExecutor.isShutdown()) {
|
if (actionExecutor.isShutdown()) {
|
||||||
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s from trigger %s because the executor has already been closed", event.toString(), source);
|
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s from trigger %s because the executor has already been closed", event.toString(), source);
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
|
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
|
||||||
log.warn(msg);
|
log.warn(msg);
|
||||||
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
|
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
|
||||||
return false;
|
return false;
|
||||||
|
@ -288,6 +291,7 @@ public class ScheduledTriggers implements Closeable {
|
||||||
actionExecutor.submit(() -> {
|
actionExecutor.submit(() -> {
|
||||||
assert hasPendingActions.get();
|
assert hasPendingActions.get();
|
||||||
long eventProcessingStart = cloudManager.getTimeSource().getTime();
|
long eventProcessingStart = cloudManager.getTimeSource().getTime();
|
||||||
|
TriggerListeners triggerListeners1 = triggerListeners.copy();
|
||||||
log.debug("-- processing actions for " + event);
|
log.debug("-- processing actions for " + event);
|
||||||
try {
|
try {
|
||||||
// in future, we could wait for pending tasks in a different thread and re-enqueue
|
// in future, we could wait for pending tasks in a different thread and re-enqueue
|
||||||
|
@ -298,22 +302,22 @@ public class ScheduledTriggers implements Closeable {
|
||||||
for (TriggerAction action : actions) {
|
for (TriggerAction action : actions) {
|
||||||
List<String> beforeActions = (List<String>) actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.BEFORE_ACTION.toString(), k -> new ArrayList<String>());
|
List<String> beforeActions = (List<String>) actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.BEFORE_ACTION.toString(), k -> new ArrayList<String>());
|
||||||
beforeActions.add(action.getName());
|
beforeActions.add(action.getName());
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
|
triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
|
||||||
try {
|
try {
|
||||||
action.process(event, actionContext);
|
action.process(event, actionContext);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, action.getName(), actionContext, e, null);
|
triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, action.getName(), actionContext, e, null);
|
||||||
throw new Exception("Error executing action: " + action.getName() + " for trigger event: " + event, e);
|
throw new Exception("Error executing action: " + action.getName() + " for trigger event: " + event, e);
|
||||||
}
|
}
|
||||||
List<String> afterActions = (List<String>) actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.AFTER_ACTION.toString(), k -> new ArrayList<String>());
|
List<String> afterActions = (List<String>) actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.AFTER_ACTION.toString(), k -> new ArrayList<String>());
|
||||||
afterActions.add(action.getName());
|
afterActions.add(action.getName());
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
|
triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
|
||||||
}
|
}
|
||||||
if (enqueued) {
|
if (enqueued) {
|
||||||
TriggerEvent ev = triggerWrapper.dequeue();
|
TriggerEvent ev = triggerWrapper.dequeue();
|
||||||
assert ev.getId().equals(event.getId());
|
assert ev.getId().equals(event.getId());
|
||||||
}
|
}
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
|
triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("Exception executing actions", e);
|
log.warn("Exception executing actions", e);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -333,7 +337,7 @@ public class ScheduledTriggers implements Closeable {
|
||||||
+ " is broken! Expected event=" + event + " but got " + ev);
|
+ " is broken! Expected event=" + event + " but got " + ev);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
|
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
|
||||||
hasPendingActions.set(false);
|
hasPendingActions.set(false);
|
||||||
// resume triggers now
|
// resume triggers now
|
||||||
resumeTriggers(0);
|
resumeTriggers(0);
|
||||||
|
@ -341,7 +345,7 @@ public class ScheduledTriggers implements Closeable {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
// there is an action in the queue and we don't want to enqueue another until it is complete
|
// there is an action in the queue and we don't want to enqueue another until it is complete
|
||||||
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Already processing another event.");
|
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Already processing another event.");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -596,6 +600,27 @@ public class ScheduledTriggers implements Closeable {
|
||||||
Map<String, TriggerListener> listenersPerName = new HashMap<>();
|
Map<String, TriggerListener> listenersPerName = new HashMap<>();
|
||||||
ReentrantLock updateLock = new ReentrantLock();
|
ReentrantLock updateLock = new ReentrantLock();
|
||||||
|
|
||||||
|
public TriggerListeners() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private TriggerListeners(Map<String, Map<TriggerEventProcessorStage, List<TriggerListener>>> listenersPerStage,
|
||||||
|
Map<String, TriggerListener> listenersPerName) {
|
||||||
|
this.listenersPerStage = new HashMap<>();
|
||||||
|
listenersPerStage.forEach((n, listeners) -> {
|
||||||
|
Map<TriggerEventProcessorStage, List<TriggerListener>> perStage = this.listenersPerStage.computeIfAbsent(n, name -> new HashMap<>());
|
||||||
|
listeners.forEach((s, lst) -> {
|
||||||
|
List<TriggerListener> newLst = perStage.computeIfAbsent(s, stage -> new ArrayList<>());
|
||||||
|
newLst.addAll(lst);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
this.listenersPerName = new HashMap<>(listenersPerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TriggerListeners copy() {
|
||||||
|
return new TriggerListeners(listenersPerStage, listenersPerName);
|
||||||
|
}
|
||||||
|
|
||||||
void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
|
void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
|
||||||
updateLock.lock();
|
updateLock.lock();
|
||||||
// we will recreate this from scratch
|
// we will recreate this from scratch
|
||||||
|
|
|
@ -178,6 +178,8 @@ public class SliceMutator {
|
||||||
props.remove("shard_parent_zk_session");
|
props.remove("shard_parent_zk_session");
|
||||||
}
|
}
|
||||||
props.put(ZkStateReader.STATE_PROP, message.getStr(key));
|
props.put(ZkStateReader.STATE_PROP, message.getStr(key));
|
||||||
|
// we need to use epoch time so that it's comparable across Overseer restarts
|
||||||
|
props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(dataProvider.getTimeSource().getEpochTime()));
|
||||||
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
|
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
|
||||||
slicesCopy.put(slice.getName(), newSlice);
|
slicesCopy.put(slice.getName(), newSlice);
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,11 @@ public class ActionThrottleTest extends SolrTestCaseJ4 {
|
||||||
return returnValues.get(index++);
|
return returnValues.get(index++);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getEpochTime() {
|
||||||
|
return getTime();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sleep(long ms) throws InterruptedException {
|
public void sleep(long ms) throws InterruptedException {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
|
|
@ -0,0 +1,133 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||||
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
|
import org.apache.solr.common.cloud.CollectionStatePredicate;
|
||||||
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
import org.apache.solr.common.cloud.Slice;
|
||||||
|
import org.apache.solr.util.TimeOut;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Some useful methods for SolrCloud tests.
|
||||||
|
*/
|
||||||
|
public class CloudTestUtils {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
public static final int DEFAULT_TIMEOUT = 90;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for a particular collection state to appear.
|
||||||
|
*
|
||||||
|
* This is a convenience method using the {@link #DEFAULT_TIMEOUT}
|
||||||
|
*
|
||||||
|
* @param cloudManager current instance of {@link SolrCloudManager}
|
||||||
|
* @param message a message to report on failure
|
||||||
|
* @param collection the collection to watch
|
||||||
|
* @param predicate a predicate to match against the collection state
|
||||||
|
*/
|
||||||
|
public static long waitForState(final SolrCloudManager cloudManager,
|
||||||
|
final String message,
|
||||||
|
final String collection,
|
||||||
|
final CollectionStatePredicate predicate) {
|
||||||
|
AtomicReference<DocCollection> state = new AtomicReference<>();
|
||||||
|
AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
|
||||||
|
try {
|
||||||
|
return waitForState(cloudManager, collection, DEFAULT_TIMEOUT, TimeUnit.SECONDS, (n, c) -> {
|
||||||
|
state.set(c);
|
||||||
|
liveNodesLastSeen.set(n);
|
||||||
|
return predicate.matches(n, c);
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new AssertionError(message + "\n" + "Live Nodes: " + liveNodesLastSeen.get() + "\nLast available state: " + state.get(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for a particular collection state to appear.
|
||||||
|
*
|
||||||
|
* This is a convenience method using the {@link #DEFAULT_TIMEOUT}
|
||||||
|
*
|
||||||
|
* @param cloudManager current instance of {@link SolrCloudManager}
|
||||||
|
* @param collection the collection to watch
|
||||||
|
* @param wait timeout value
|
||||||
|
* @param unit timeout unit
|
||||||
|
* @param predicate a predicate to match against the collection state
|
||||||
|
*/
|
||||||
|
public static long waitForState(final SolrCloudManager cloudManager,
|
||||||
|
final String collection,
|
||||||
|
long wait,
|
||||||
|
final TimeUnit unit,
|
||||||
|
final CollectionStatePredicate predicate) throws InterruptedException, TimeoutException, IOException {
|
||||||
|
TimeOut timeout = new TimeOut(wait, unit, cloudManager.getTimeSource());
|
||||||
|
long timeWarn = timeout.timeLeft(TimeUnit.MILLISECONDS) / 4;
|
||||||
|
while (!timeout.hasTimedOut()) {
|
||||||
|
ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
|
||||||
|
DocCollection coll = state.getCollectionOrNull(collection);
|
||||||
|
// due to the way we manage collections in SimClusterStateProvider a null here
|
||||||
|
// can mean that a collection is still being created but has no replicas
|
||||||
|
if (coll == null) { // does not yet exist?
|
||||||
|
timeout.sleep(50);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (predicate.matches(state.getLiveNodes(), coll)) {
|
||||||
|
log.trace("-- predicate matched with state {}", state);
|
||||||
|
return timeout.timeElapsed(TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
timeout.sleep(50);
|
||||||
|
if (timeout.timeLeft(TimeUnit.MILLISECONDS) < timeWarn) {
|
||||||
|
log.trace("-- still not matching predicate: {}", state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new TimeoutException();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a {@link CollectionStatePredicate} that returns true if a collection has the expected
|
||||||
|
* number of shards and replicas
|
||||||
|
*/
|
||||||
|
public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) {
|
||||||
|
return (liveNodes, collectionState) -> {
|
||||||
|
if (collectionState == null)
|
||||||
|
return false;
|
||||||
|
if (collectionState.getSlices().size() != expectedShards)
|
||||||
|
return false;
|
||||||
|
for (Slice slice : collectionState) {
|
||||||
|
int activeReplicas = 0;
|
||||||
|
for (Replica replica : slice) {
|
||||||
|
if (replica.isActive(liveNodes))
|
||||||
|
activeReplicas++;
|
||||||
|
}
|
||||||
|
if (activeReplicas != expectedReplicas)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,234 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.cloud.autoscaling.sim.SimCloudManager;
|
||||||
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.common.util.TimeSource;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
|
||||||
|
public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static SolrCloudManager cloudManager;
|
||||||
|
private static SolrClient solrClient;
|
||||||
|
private static TimeSource timeSource;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(1)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
if (random().nextBoolean()) {
|
||||||
|
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
||||||
|
solrClient = cluster.getSolrClient();
|
||||||
|
} else {
|
||||||
|
cloudManager = SimCloudManager.createCluster(1, TimeSource.get("simTime:50"));
|
||||||
|
solrClient = ((SimCloudManager)cloudManager).simGetSolrClient();
|
||||||
|
}
|
||||||
|
timeSource = cloudManager.getTimeSource();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void restoreDefaults() throws Exception {
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST,
|
||||||
|
"{'set-trigger' : " + AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_DSL + "}");
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
|
||||||
|
if (autoScalingConfig.getTriggerListenerConfigs().containsKey("foo")) {
|
||||||
|
String cmd = "{" +
|
||||||
|
"'remove-listener' : {'name' : 'foo'}" +
|
||||||
|
"}";
|
||||||
|
response = solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, cmd));
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void teardown() throws Exception {
|
||||||
|
if (cloudManager instanceof SimCloudManager) {
|
||||||
|
cloudManager.close();
|
||||||
|
}
|
||||||
|
solrClient = null;
|
||||||
|
cloudManager = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTriggerDefaults() throws Exception {
|
||||||
|
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
|
||||||
|
log.info(autoScalingConfig.toString());
|
||||||
|
AutoScalingConfig.TriggerConfig triggerConfig = autoScalingConfig.getTriggerConfigs().get(AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME);
|
||||||
|
assertNotNull(triggerConfig);
|
||||||
|
assertEquals(2, triggerConfig.actions.size());
|
||||||
|
assertTrue(triggerConfig.actions.get(0).actionClass.endsWith(InactiveShardPlanAction.class.getSimpleName()));
|
||||||
|
assertTrue(triggerConfig.actions.get(1).actionClass.endsWith(ExecutePlanAction.class.getSimpleName()));
|
||||||
|
AutoScalingConfig.TriggerListenerConfig listenerConfig = autoScalingConfig.getTriggerListenerConfigs().get(AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME + ".system");
|
||||||
|
assertNotNull(listenerConfig);
|
||||||
|
assertEquals(AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME, listenerConfig.trigger);
|
||||||
|
assertTrue(listenerConfig.listenerClass.endsWith(SystemLogListener.class.getSimpleName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
static Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
|
||||||
|
static CountDownLatch listenerCreated = new CountDownLatch(1);
|
||||||
|
|
||||||
|
public static class CapturingTriggerListener extends TriggerListenerBase {
|
||||||
|
@Override
|
||||||
|
public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
|
||||||
|
super.init(cloudManager, config);
|
||||||
|
listenerCreated.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
|
||||||
|
ActionContext context, Throwable error, String message) {
|
||||||
|
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
|
||||||
|
CapturedEvent ev = new CapturedEvent(timeSource.getTime(), context, config, stage, actionName, event, message);
|
||||||
|
log.info("=======> " + ev);
|
||||||
|
lst.add(ev);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static CountDownLatch triggerFired = new CountDownLatch(1);
|
||||||
|
|
||||||
|
public static class TestTriggerAction extends TriggerActionBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||||
|
if (context.getProperties().containsKey("inactive_shard_plan")) {
|
||||||
|
triggerFired.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInactiveShardCleanup() throws Exception {
|
||||||
|
String collection1 = getClass().getSimpleName() + "_collection1";
|
||||||
|
CollectionAdminRequest.Create create1 = CollectionAdminRequest.createCollection(collection1,
|
||||||
|
"conf", 1, 1);
|
||||||
|
|
||||||
|
create1.process(solrClient);
|
||||||
|
CloudTestUtils.waitForState(cloudManager, "failed to create " + collection1, collection1,
|
||||||
|
CloudTestUtils.clusterShape(1, 1));
|
||||||
|
|
||||||
|
CollectionAdminRequest.SplitShard split1 = CollectionAdminRequest.splitShard(collection1)
|
||||||
|
.setShardName("shard1");
|
||||||
|
split1.process(solrClient);
|
||||||
|
CloudTestUtils.waitForState(cloudManager, "failed to split " + collection1, collection1,
|
||||||
|
CloudTestUtils.clusterShape(3, 1));
|
||||||
|
|
||||||
|
String setListenerCommand = "{" +
|
||||||
|
"'set-listener' : " +
|
||||||
|
"{" +
|
||||||
|
"'name' : 'foo'," +
|
||||||
|
"'trigger' : '" + AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME + "'," +
|
||||||
|
"'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," +
|
||||||
|
"'beforeAction' : 'inactive_shard_plan'," +
|
||||||
|
"'afterAction' : 'inactive_shard_plan'," +
|
||||||
|
"'class' : '" + CapturingTriggerListener.class.getName() + "'" +
|
||||||
|
"}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : '" + AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME + "'," +
|
||||||
|
"'event' : 'scheduled'," +
|
||||||
|
"'startTime' : 'NOW+3SECONDS'," +
|
||||||
|
"'every' : '+2SECONDS'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name' : 'inactive_shard_plan', 'class' : 'solr.InactiveShardPlanAction', 'ttl' : '10'}," +
|
||||||
|
"{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name' : 'test', 'class' : '" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
boolean await = listenerCreated.await(10, TimeUnit.SECONDS);
|
||||||
|
assertTrue("listener not created in time", await);
|
||||||
|
await = triggerFired.await(60, TimeUnit.SECONDS);
|
||||||
|
assertTrue("cleanup action didn't run", await);
|
||||||
|
|
||||||
|
// cleanup should have occurred
|
||||||
|
assertFalse("no events captured!", listenerEvents.isEmpty());
|
||||||
|
List<CapturedEvent> events = new ArrayList<>(listenerEvents.get("foo"));
|
||||||
|
listenerEvents.clear();
|
||||||
|
|
||||||
|
assertFalse(events.isEmpty());
|
||||||
|
int inactiveEvents = 0;
|
||||||
|
CapturedEvent ce = null;
|
||||||
|
for (CapturedEvent e : events) {
|
||||||
|
if (e.stage != TriggerEventProcessorStage.AFTER_ACTION) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (e.context.containsKey("properties.inactive_shard_plan")) {
|
||||||
|
ce = e;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
inactiveEvents++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("should be at least one inactive event", inactiveEvents > 0);
|
||||||
|
assertNotNull("missing cleanup event", ce);
|
||||||
|
Map<String, Object> map = (Map<String, Object>)ce.context.get("properties.inactive_shard_plan");
|
||||||
|
assertNotNull(map);
|
||||||
|
|
||||||
|
Map<String, List<String>> inactive = (Map<String, List<String>>)map.get("inactive");
|
||||||
|
assertEquals(1, inactive.size());
|
||||||
|
assertNotNull(inactive.get(collection1));
|
||||||
|
Map<String, List<String>> cleanup = (Map<String, List<String>>)map.get("cleanup");
|
||||||
|
assertEquals(1, cleanup.size());
|
||||||
|
assertNotNull(cleanup.get(collection1));
|
||||||
|
|
||||||
|
ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
|
||||||
|
|
||||||
|
CloudTestUtils.clusterShape(2, 1).matches(state.getLiveNodes(), state.getCollection(collection1));
|
||||||
|
}
|
||||||
|
}
|
|
@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.AtomicDouble;
|
import com.google.common.util.concurrent.AtomicDouble;
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||||
|
@ -105,6 +106,14 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
.addConfig("conf", configset("cloud-minimal"))
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
.configure();
|
.configure();
|
||||||
zkStateReader = cluster.getSolrClient().getZkStateReader();
|
zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.getSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static CountDownLatch getTriggerFiredLatch() {
|
private static CountDownLatch getTriggerFiredLatch() {
|
||||||
|
|
|
@ -496,6 +496,7 @@ public class SimCloudManager implements SolrCloudManager {
|
||||||
SolrQueryResponse queryResponse = new SolrQueryResponse();
|
SolrQueryResponse queryResponse = new SolrQueryResponse();
|
||||||
autoScalingHandler.handleRequest(queryRequest, queryResponse);
|
autoScalingHandler.handleRequest(queryRequest, queryResponse);
|
||||||
if (queryResponse.getException() != null) {
|
if (queryResponse.getException() != null) {
|
||||||
|
LOG.debug("-- exception handling request", queryResponse.getException());
|
||||||
throw new IOException(queryResponse.getException());
|
throw new IOException(queryResponse.getException());
|
||||||
}
|
}
|
||||||
SolrResponse rsp = new SolrResponseBase();
|
SolrResponse rsp = new SolrResponseBase();
|
||||||
|
@ -608,6 +609,13 @@ public class SimCloudManager implements SolrCloudManager {
|
||||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case DELETESHARD:
|
||||||
|
try {
|
||||||
|
clusterStateProvider.simDeleteShard(new ZkNodeProps(req.getParams().toNamedList().asMap(10)), results);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
|
||||||
|
}
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException("Unsupported collection admin action=" + action + " in request: " + req.getParams());
|
throw new UnsupportedOperationException("Unsupported collection admin action=" + action + " in request: " + req.getParams());
|
||||||
}
|
}
|
||||||
|
|
|
@ -358,13 +358,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
||||||
* @param runLeaderElection if true then run a leader election after adding the replica.
|
* @param runLeaderElection if true then run a leader election after adding the replica.
|
||||||
*/
|
*/
|
||||||
public void simAddReplica(String nodeId, ReplicaInfo replicaInfo, boolean runLeaderElection) throws Exception {
|
public void simAddReplica(String nodeId, ReplicaInfo replicaInfo, boolean runLeaderElection) throws Exception {
|
||||||
// make sure coreNodeName is unique across cluster
|
// make sure SolrCore name is unique across cluster and coreNodeName within collection
|
||||||
for (Map.Entry<String, List<ReplicaInfo>> e : nodeReplicaMap.entrySet()) {
|
for (Map.Entry<String, List<ReplicaInfo>> e : nodeReplicaMap.entrySet()) {
|
||||||
for (ReplicaInfo ri : e.getValue()) {
|
for (ReplicaInfo ri : e.getValue()) {
|
||||||
if (ri.getCore().equals(replicaInfo.getCore())) {
|
if (ri.getCore().equals(replicaInfo.getCore())) {
|
||||||
throw new Exception("Duplicate SolrCore name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
|
throw new Exception("Duplicate SolrCore name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
|
||||||
}
|
}
|
||||||
if (ri.getName().equals(replicaInfo.getName())) {
|
if (ri.getName().equals(replicaInfo.getName()) && ri.getCollection().equals(replicaInfo.getCollection())) {
|
||||||
throw new Exception("Duplicate coreNode name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
|
throw new Exception("Duplicate coreNode name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -857,6 +857,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
||||||
* @param results operation results.
|
* @param results operation results.
|
||||||
*/
|
*/
|
||||||
public void simSplitShard(ZkNodeProps message, NamedList results) throws Exception {
|
public void simSplitShard(ZkNodeProps message, NamedList results) throws Exception {
|
||||||
|
if (message.getStr(CommonAdminParams.ASYNC) != null) {
|
||||||
|
results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
|
||||||
|
}
|
||||||
String collectionName = message.getStr(COLLECTION_PROP);
|
String collectionName = message.getStr(COLLECTION_PROP);
|
||||||
AtomicReference<String> sliceName = new AtomicReference<>();
|
AtomicReference<String> sliceName = new AtomicReference<>();
|
||||||
sliceName.set(message.getStr(SHARD_ID_PROP));
|
sliceName.set(message.getStr(SHARD_ID_PROP));
|
||||||
|
@ -871,20 +874,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
||||||
opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
|
opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
|
||||||
|
|
||||||
SplitShardCmd.fillRanges(cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames);
|
SplitShardCmd.fillRanges(cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames);
|
||||||
// mark the old slice as inactive
|
|
||||||
sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
|
||||||
.computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>())
|
|
||||||
.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.INACTIVE.toString());
|
|
||||||
// add slice props
|
|
||||||
for (int i = 0; i < subRanges.size(); i++) {
|
|
||||||
String subSlice = subSlices.get(i);
|
|
||||||
DocRouter.Range range = subRanges.get(i);
|
|
||||||
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
|
||||||
.computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
|
|
||||||
sliceProps.put(Slice.RANGE, range);
|
|
||||||
sliceProps.put(Slice.PARENT, sliceName.get());
|
|
||||||
sliceProps.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.ACTIVE.toString());
|
|
||||||
}
|
|
||||||
// add replicas for new subShards
|
// add replicas for new subShards
|
||||||
int repFactor = parentSlice.getReplicas().size();
|
int repFactor = parentSlice.getReplicas().size();
|
||||||
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(cloudManager,
|
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(cloudManager,
|
||||||
|
@ -911,6 +900,22 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
||||||
solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
|
solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
|
||||||
simAddReplica(replicaPosition.node, ri, false);
|
simAddReplica(replicaPosition.node, ri, false);
|
||||||
}
|
}
|
||||||
|
// mark the old slice as inactive
|
||||||
|
Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
||||||
|
.computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>());
|
||||||
|
props.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString());
|
||||||
|
props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTime()));
|
||||||
|
// add slice props
|
||||||
|
for (int i = 0; i < subRanges.size(); i++) {
|
||||||
|
String subSlice = subSlices.get(i);
|
||||||
|
DocRouter.Range range = subRanges.get(i);
|
||||||
|
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
||||||
|
.computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
|
||||||
|
sliceProps.put(Slice.RANGE, range);
|
||||||
|
sliceProps.put(Slice.PARENT, sliceName.get());
|
||||||
|
sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.ACTIVE.toString());
|
||||||
|
props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTime()));
|
||||||
|
}
|
||||||
simRunLeaderElection(Collections.singleton(collectionName), true);
|
simRunLeaderElection(Collections.singleton(collectionName), true);
|
||||||
results.add("success", "");
|
results.add("success", "");
|
||||||
|
|
||||||
|
@ -922,6 +927,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
||||||
* @param results operation results
|
* @param results operation results
|
||||||
*/
|
*/
|
||||||
public void simDeleteShard(ZkNodeProps message, NamedList results) throws Exception {
|
public void simDeleteShard(ZkNodeProps message, NamedList results) throws Exception {
|
||||||
|
if (message.getStr(CommonAdminParams.ASYNC) != null) {
|
||||||
|
results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
|
||||||
|
}
|
||||||
String collectionName = message.getStr(COLLECTION_PROP);
|
String collectionName = message.getStr(COLLECTION_PROP);
|
||||||
String sliceName = message.getStr(SHARD_ID_PROP);
|
String sliceName = message.getStr(SHARD_ID_PROP);
|
||||||
ClusterState clusterState = getClusterState();
|
ClusterState clusterState = getClusterState();
|
||||||
|
|
|
@ -26,16 +26,11 @@ import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
import org.apache.solr.SolrTestCaseJ4;
|
import org.apache.solr.SolrTestCaseJ4;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||||
import org.apache.solr.common.cloud.ClusterState;
|
|
||||||
import org.apache.solr.common.cloud.CollectionStatePredicate;
|
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.cloud.Slice;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
|
@ -43,7 +38,6 @@ import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.util.TimeSource;
|
import org.apache.solr.common.util.TimeSource;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.util.TimeOut;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -64,8 +58,6 @@ import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_P
|
||||||
public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
|
public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
public static final int DEFAULT_TIMEOUT = 90;
|
|
||||||
|
|
||||||
/** The cluster. */
|
/** The cluster. */
|
||||||
protected static SimCloudManager cluster;
|
protected static SimCloudManager cluster;
|
||||||
protected static int clusterNodeCount = 0;
|
protected static int clusterNodeCount = 0;
|
||||||
|
@ -198,92 +190,6 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
|
||||||
return cluster.getClusterStateProvider().getClusterState().getCollection(collectionName);
|
return cluster.getClusterStateProvider().getClusterState().getCollection(collectionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait for a particular collection state to appear in the cluster client's state reader
|
|
||||||
*
|
|
||||||
* This is a convenience method using the {@link #DEFAULT_TIMEOUT}
|
|
||||||
*
|
|
||||||
* @param message a message to report on failure
|
|
||||||
* @param collection the collection to watch
|
|
||||||
* @param predicate a predicate to match against the collection state
|
|
||||||
*/
|
|
||||||
protected long waitForState(String message, String collection, CollectionStatePredicate predicate) {
|
|
||||||
AtomicReference<DocCollection> state = new AtomicReference<>();
|
|
||||||
AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
|
|
||||||
try {
|
|
||||||
return waitForState(collection, DEFAULT_TIMEOUT, TimeUnit.SECONDS, (n, c) -> {
|
|
||||||
state.set(c);
|
|
||||||
liveNodesLastSeen.set(n);
|
|
||||||
return predicate.matches(n, c);
|
|
||||||
});
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new AssertionError(message + "\n" + "Live Nodes: " + liveNodesLastSeen.get() + "\nLast available state: " + state.get(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Block until a CollectionStatePredicate returns true, or the wait times out
|
|
||||||
*
|
|
||||||
* Note that the predicate may be called again even after it has returned true, so
|
|
||||||
* implementors should avoid changing state within the predicate call itself.
|
|
||||||
*
|
|
||||||
* @param collection the collection to watch
|
|
||||||
* @param wait how long to wait
|
|
||||||
* @param unit the units of the wait parameter
|
|
||||||
* @param predicate the predicate to call on state changes
|
|
||||||
* @return number of milliseconds elapsed
|
|
||||||
* @throws InterruptedException on interrupt
|
|
||||||
* @throws TimeoutException on timeout
|
|
||||||
* @throws IOException on watcher register / unregister error
|
|
||||||
*/
|
|
||||||
public long waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
|
|
||||||
throws InterruptedException, TimeoutException, IOException {
|
|
||||||
TimeOut timeout = new TimeOut(wait, unit, cluster.getTimeSource());
|
|
||||||
long timeWarn = timeout.timeLeft(TimeUnit.MILLISECONDS) / 4;
|
|
||||||
while (!timeout.hasTimedOut()) {
|
|
||||||
ClusterState state = cluster.getClusterStateProvider().getClusterState();
|
|
||||||
DocCollection coll = state.getCollectionOrNull(collection);
|
|
||||||
// due to the way we manage collections in SimClusterStateProvider a null here
|
|
||||||
// can mean that a collection is still being created but has no replicas
|
|
||||||
if (coll == null) { // does not yet exist?
|
|
||||||
timeout.sleep(50);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (predicate.matches(state.getLiveNodes(), coll)) {
|
|
||||||
log.trace("-- predicate matched with state {}", state);
|
|
||||||
return timeout.timeElapsed(TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
timeout.sleep(50);
|
|
||||||
if (timeout.timeLeft(TimeUnit.MILLISECONDS) < timeWarn) {
|
|
||||||
log.trace("-- still not matching predicate: {}", state);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new TimeoutException();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return a {@link CollectionStatePredicate} that returns true if a collection has the expected
|
|
||||||
* number of shards and replicas
|
|
||||||
*/
|
|
||||||
public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) {
|
|
||||||
return (liveNodes, collectionState) -> {
|
|
||||||
if (collectionState == null)
|
|
||||||
return false;
|
|
||||||
if (collectionState.getSlices().size() != expectedShards)
|
|
||||||
return false;
|
|
||||||
for (Slice slice : collectionState) {
|
|
||||||
int activeReplicas = 0;
|
|
||||||
for (Replica replica : slice) {
|
|
||||||
if (replica.isActive(liveNodes))
|
|
||||||
activeReplicas++;
|
|
||||||
}
|
|
||||||
if (activeReplicas != expectedReplicas)
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a (reproducibly) random shard from a {@link DocCollection}
|
* Get a (reproducibly) random shard from a {@link DocCollection}
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.SolrResponse;
|
import org.apache.solr.client.solrj.SolrResponse;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
import org.apache.solr.cloud.autoscaling.ActionContext;
|
import org.apache.solr.cloud.autoscaling.ActionContext;
|
||||||
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
||||||
import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
|
import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
|
||||||
|
@ -141,8 +142,8 @@ public class TestComputePlanAction extends SimSolrCloudTestCase {
|
||||||
"conf",1, 2);
|
"conf",1, 2);
|
||||||
create.process(solrClient);
|
create.process(solrClient);
|
||||||
|
|
||||||
waitForState("Timed out waiting for replicas of new collection to be active",
|
CloudTestUtils.waitForState(cluster, "Timed out waiting for replicas of new collection to be active",
|
||||||
"testNodeLost", clusterShape(1, 2));
|
"testNodeLost", CloudTestUtils.clusterShape(1, 2));
|
||||||
|
|
||||||
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
|
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
|
||||||
log.debug("-- cluster state: {}", clusterState);
|
log.debug("-- cluster state: {}", clusterState);
|
||||||
|
@ -204,8 +205,8 @@ public class TestComputePlanAction extends SimSolrCloudTestCase {
|
||||||
// create.setMaxShardsPerNode(2);
|
// create.setMaxShardsPerNode(2);
|
||||||
create.process(solrClient);
|
create.process(solrClient);
|
||||||
|
|
||||||
waitForState("Timed out waiting for replicas of new collection to be active",
|
CloudTestUtils.waitForState(cluster, "Timed out waiting for replicas of new collection to be active",
|
||||||
"testNodeWithMultipleReplicasLost", clusterShape(2, 3));
|
"testNodeWithMultipleReplicasLost", CloudTestUtils.clusterShape(2, 3));
|
||||||
|
|
||||||
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
|
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
|
||||||
log.debug("-- cluster state: {}", clusterState);
|
log.debug("-- cluster state: {}", clusterState);
|
||||||
|
@ -281,7 +282,7 @@ public class TestComputePlanAction extends SimSolrCloudTestCase {
|
||||||
"conf",1, 2);
|
"conf",1, 2);
|
||||||
create.process(solrClient);
|
create.process(solrClient);
|
||||||
|
|
||||||
waitForState("Timed out waiting for replicas of new collection to be active",
|
CloudTestUtils.waitForState(cluster, "Timed out waiting for replicas of new collection to be active",
|
||||||
"testNodeAdded", (liveNodes, collectionState) -> collectionState.getReplicas().stream().allMatch(replica -> replica.isActive(liveNodes)));
|
"testNodeAdded", (liveNodes, collectionState) -> collectionState.getReplicas().stream().allMatch(replica -> replica.isActive(liveNodes)));
|
||||||
|
|
||||||
// reset to the original policy which has only 1 replica per shard per node
|
// reset to the original policy which has only 1 replica per shard per node
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
import org.apache.solr.cloud.autoscaling.ActionContext;
|
import org.apache.solr.cloud.autoscaling.ActionContext;
|
||||||
import org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest;
|
import org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest;
|
||||||
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
||||||
|
@ -86,7 +87,8 @@ public class TestExecutePlanAction extends SimSolrCloudTestCase {
|
||||||
create.setMaxShardsPerNode(1);
|
create.setMaxShardsPerNode(1);
|
||||||
create.process(solrClient);
|
create.process(solrClient);
|
||||||
|
|
||||||
log.info("Collection ready after " + waitForState(collectionName, 120, TimeUnit.SECONDS, clusterShape(1, 2)) + "ms");
|
log.info("Collection ready after " + CloudTestUtils.waitForState(cluster, collectionName, 120, TimeUnit.SECONDS,
|
||||||
|
CloudTestUtils.clusterShape(1, 2)) + "ms");
|
||||||
|
|
||||||
String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||||
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
|
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
|
||||||
|
@ -147,7 +149,8 @@ public class TestExecutePlanAction extends SimSolrCloudTestCase {
|
||||||
assertNotNull(response.get("success"));
|
assertNotNull(response.get("success"));
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Collection ready after " + waitForState(collectionName, 300, TimeUnit.SECONDS, clusterShape(1, 2)) + "ms");
|
log.info("Collection ready after " + CloudTestUtils.waitForState(cluster, collectionName, 300, TimeUnit.SECONDS,
|
||||||
|
CloudTestUtils.clusterShape(1, 2)) + "ms");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -173,8 +176,8 @@ public class TestExecutePlanAction extends SimSolrCloudTestCase {
|
||||||
create.setMaxShardsPerNode(1);
|
create.setMaxShardsPerNode(1);
|
||||||
create.process(solrClient);
|
create.process(solrClient);
|
||||||
|
|
||||||
waitForState("Timed out waiting for replicas of new collection to be active",
|
CloudTestUtils.waitForState(cluster, "Timed out waiting for replicas of new collection to be active",
|
||||||
collectionName, clusterShape(1, 2));
|
collectionName, CloudTestUtils.clusterShape(1, 2));
|
||||||
|
|
||||||
String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||||
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
|
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
|
||||||
|
@ -190,8 +193,8 @@ public class TestExecutePlanAction extends SimSolrCloudTestCase {
|
||||||
|
|
||||||
cluster.simRemoveNode(sourceNodeName, false);
|
cluster.simRemoveNode(sourceNodeName, false);
|
||||||
|
|
||||||
waitForState("Timed out waiting for replicas of collection to be 2 again",
|
CloudTestUtils.waitForState(cluster, "Timed out waiting for replicas of collection to be 2 again",
|
||||||
collectionName, clusterShape(1, 2));
|
collectionName, CloudTestUtils.clusterShape(1, 2));
|
||||||
|
|
||||||
clusterState = cluster.getClusterStateProvider().getClusterState();
|
clusterState = cluster.getClusterStateProvider().getClusterState();
|
||||||
docCollection = clusterState.getCollection(collectionName);
|
docCollection = clusterState.getCollection(collectionName);
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
import org.apache.solr.cloud.autoscaling.ActionContext;
|
import org.apache.solr.cloud.autoscaling.ActionContext;
|
||||||
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
||||||
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
||||||
|
@ -91,6 +92,14 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
triggerFiredCount.set(0);
|
triggerFiredCount.set(0);
|
||||||
triggerFiredLatch = new CountDownLatch(1);
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
listenerEvents.clear();
|
listenerEvents.clear();
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TestTriggerListener extends TriggerListenerBase {
|
public static class TestTriggerListener extends TriggerListenerBase {
|
||||||
|
@ -170,7 +179,8 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
create.setCreateNodeSet(String.join(",", nodes));
|
create.setCreateNodeSet(String.join(",", nodes));
|
||||||
create.process(solrClient);
|
create.process(solrClient);
|
||||||
|
|
||||||
log.info("Ready after " + waitForState(collectionName, 30 * nodes.size(), TimeUnit.SECONDS, clusterShape(5, 15)) + "ms");
|
log.info("Ready after " + CloudTestUtils.waitForState(cluster, collectionName, 30 * nodes.size(), TimeUnit.SECONDS,
|
||||||
|
CloudTestUtils.clusterShape(5, 15)) + "ms");
|
||||||
|
|
||||||
int KILL_NODES = 8;
|
int KILL_NODES = 8;
|
||||||
// kill off a number of nodes
|
// kill off a number of nodes
|
||||||
|
@ -178,7 +188,8 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
cluster.simRemoveNode(nodes.get(i), false);
|
cluster.simRemoveNode(nodes.get(i), false);
|
||||||
}
|
}
|
||||||
// should fully recover
|
// should fully recover
|
||||||
log.info("Ready after " + waitForState(collectionName, 90 * KILL_NODES, TimeUnit.SECONDS, clusterShape(5, 15)) + "ms");
|
log.info("Ready after " + CloudTestUtils.waitForState(cluster, collectionName, 90 * KILL_NODES, TimeUnit.SECONDS,
|
||||||
|
CloudTestUtils.clusterShape(5, 15)) + "ms");
|
||||||
|
|
||||||
log.info("OP COUNTS: " + cluster.simGetOpCounts());
|
log.info("OP COUNTS: " + cluster.simGetOpCounts());
|
||||||
long moveReplicaOps = cluster.simGetOpCount(CollectionParams.CollectionAction.MOVEREPLICA.name());
|
long moveReplicaOps = cluster.simGetOpCount(CollectionParams.CollectionAction.MOVEREPLICA.name());
|
||||||
|
@ -199,7 +210,8 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Ready after " + waitForState(collectionName, 30 * nodes.size(), TimeUnit.SECONDS, clusterShape(5, 15)) + "ms");
|
log.info("Ready after " + CloudTestUtils.waitForState(cluster, collectionName, 30 * nodes.size(), TimeUnit.SECONDS,
|
||||||
|
CloudTestUtils.clusterShape(5, 15)) + "ms");
|
||||||
long newMoveReplicaOps = cluster.simGetOpCount(CollectionParams.CollectionAction.MOVEREPLICA.name());
|
long newMoveReplicaOps = cluster.simGetOpCount(CollectionParams.CollectionAction.MOVEREPLICA.name());
|
||||||
log.info("==== Flaky replicas: {}. Additional MOVEREPLICA count: {}", flakyReplicas, (newMoveReplicaOps - moveReplicaOps));
|
log.info("==== Flaky replicas: {}. Additional MOVEREPLICA count: {}", flakyReplicas, (newMoveReplicaOps - moveReplicaOps));
|
||||||
// flaky nodes lead to a number of MOVEREPLICA that is non-zero but lower than the number of flaky replicas
|
// flaky nodes lead to a number of MOVEREPLICA that is non-zero but lower than the number of flaky replicas
|
||||||
|
@ -219,7 +231,8 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
"'enabled' : true," +
|
"'enabled' : true," +
|
||||||
"'actions' : [" +
|
"'actions' : [" +
|
||||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
|
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
|
||||||
"]" +
|
"]" +
|
||||||
"}}";
|
"}}";
|
||||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
@ -229,12 +242,13 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
// create a collection with more than 1 replica per node
|
// create a collection with more than 1 replica per node
|
||||||
String collectionName = "testNodeAdded";
|
String collectionName = "testNodeAdded";
|
||||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
||||||
"conf", NUM_NODES / 10, NUM_NODES / 10, NUM_NODES / 10, NUM_NODES / 10);
|
"conf", NUM_NODES / 10, NUM_NODES / 8, NUM_NODES / 8, NUM_NODES / 8);
|
||||||
create.setMaxShardsPerNode(5);
|
create.setMaxShardsPerNode(5);
|
||||||
create.setAutoAddReplicas(false);
|
create.setAutoAddReplicas(false);
|
||||||
create.process(solrClient);
|
create.process(solrClient);
|
||||||
|
|
||||||
log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 10, NUM_NODES / 10 * 3)) + " ms");
|
log.info("Ready after " + CloudTestUtils.waitForState(cluster, collectionName, 20 * NUM_NODES, TimeUnit.SECONDS,
|
||||||
|
CloudTestUtils.clusterShape(NUM_NODES / 10, NUM_NODES / 8 * 3)) + " ms");
|
||||||
|
|
||||||
int numAddNode = NUM_NODES / 5;
|
int numAddNode = NUM_NODES / 5;
|
||||||
List<String> addNodesList = new ArrayList<>(numAddNode);
|
List<String> addNodesList = new ArrayList<>(numAddNode);
|
||||||
|
@ -242,6 +256,9 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
addNodesList.add(cluster.simAddNode());
|
addNodesList.add(cluster.simAddNode());
|
||||||
cluster.getTimeSource().sleep(5000);
|
cluster.getTimeSource().sleep(5000);
|
||||||
}
|
}
|
||||||
|
boolean await = triggerFiredLatch.await(1000000 / SPEED, TimeUnit.MILLISECONDS);
|
||||||
|
assertTrue("trigger did not fire", await);
|
||||||
|
|
||||||
List<SolrInputDocument> systemColl = cluster.simGetSystemCollection();
|
List<SolrInputDocument> systemColl = cluster.simGetSystemCollection();
|
||||||
int startedEventPos = -1;
|
int startedEventPos = -1;
|
||||||
for (int i = 0; i < systemColl.size(); i++) {
|
for (int i = 0; i < systemColl.size(); i++) {
|
||||||
|
@ -261,7 +278,8 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
// make sure some replicas have been moved
|
// make sure some replicas have been moved
|
||||||
assertTrue("no MOVEREPLICA ops?", cluster.simGetOpCount("MOVEREPLICA") > 0);
|
assertTrue("no MOVEREPLICA ops?", cluster.simGetOpCount("MOVEREPLICA") > 0);
|
||||||
|
|
||||||
log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 10, NUM_NODES / 10 * 3)) + " ms");
|
log.info("Ready after " + CloudTestUtils.waitForState(cluster, collectionName, 20 * NUM_NODES, TimeUnit.SECONDS,
|
||||||
|
CloudTestUtils.clusterShape(NUM_NODES / 10, NUM_NODES / 8 * 3)) + " ms");
|
||||||
|
|
||||||
int count = 50;
|
int count = 50;
|
||||||
SolrInputDocument finishedEvent = null;
|
SolrInputDocument finishedEvent = null;
|
||||||
|
@ -381,9 +399,9 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
"'waitFor' : '" + waitFor + "s'," +
|
"'waitFor' : '" + waitFor + "s'," +
|
||||||
"'enabled' : true," +
|
"'enabled' : true," +
|
||||||
"'actions' : [" +
|
"'actions' : [" +
|
||||||
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}," +
|
|
||||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
|
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
|
||||||
"]" +
|
"]" +
|
||||||
"}}";
|
"}}";
|
||||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
@ -398,7 +416,8 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
create.setAutoAddReplicas(false);
|
create.setAutoAddReplicas(false);
|
||||||
create.process(solrClient);
|
create.process(solrClient);
|
||||||
|
|
||||||
log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 5, NUM_NODES / 10)) + " ms");
|
log.info("Ready after " + CloudTestUtils.waitForState(cluster, collectionName, 20 * NUM_NODES, TimeUnit.SECONDS,
|
||||||
|
CloudTestUtils.clusterShape(NUM_NODES / 5, NUM_NODES / 10)) + " ms");
|
||||||
|
|
||||||
// start killing nodes
|
// start killing nodes
|
||||||
int numNodes = NUM_NODES / 5;
|
int numNodes = NUM_NODES / 5;
|
||||||
|
@ -410,7 +429,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
cluster.getTimeSource().sleep(killDelay);
|
cluster.getTimeSource().sleep(killDelay);
|
||||||
}
|
}
|
||||||
// wait for the trigger to fire
|
// wait for the trigger to fire
|
||||||
boolean await = triggerFiredLatch.await(10 * waitFor * 1000 / SPEED, TimeUnit.MILLISECONDS);
|
boolean await = triggerFiredLatch.await(20 * waitFor * 1000 / SPEED, TimeUnit.MILLISECONDS);
|
||||||
assertTrue("trigger did not fire within timeout, " +
|
assertTrue("trigger did not fire within timeout, " +
|
||||||
"waitFor=" + waitFor + ", killDelay=" + killDelay + ", minIgnored=" + minIgnored,
|
"waitFor=" + waitFor + ", killDelay=" + killDelay + ", minIgnored=" + minIgnored,
|
||||||
await);
|
await);
|
||||||
|
@ -453,7 +472,8 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
"waitFor=" + waitFor + ", killDelay=" + killDelay + ", minIgnored=" + minIgnored,
|
"waitFor=" + waitFor + ", killDelay=" + killDelay + ", minIgnored=" + minIgnored,
|
||||||
cluster.simGetOpCount("MOVEREPLICA") > 0);
|
cluster.simGetOpCount("MOVEREPLICA") > 0);
|
||||||
|
|
||||||
log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 5, NUM_NODES / 10)) + " ms");
|
log.info("Ready after " + CloudTestUtils.waitForState(cluster, collectionName, 20 * NUM_NODES, TimeUnit.SECONDS,
|
||||||
|
CloudTestUtils.clusterShape(NUM_NODES / 5, NUM_NODES / 10)) + " ms");
|
||||||
|
|
||||||
int count = 50;
|
int count = 50;
|
||||||
SolrInputDocument finishedEvent = null;
|
SolrInputDocument finishedEvent = null;
|
||||||
|
@ -502,7 +522,8 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
||||||
"conf", 2, 10);
|
"conf", 2, 10);
|
||||||
create.process(solrClient);
|
create.process(solrClient);
|
||||||
|
|
||||||
log.info("Ready after " + waitForState(collectionName, 300, TimeUnit.SECONDS, clusterShape(2, 10)) + " ms");
|
log.info("Ready after " + CloudTestUtils.waitForState(cluster, collectionName, 300, TimeUnit.SECONDS,
|
||||||
|
CloudTestUtils.clusterShape(2, 10)) + " ms");
|
||||||
|
|
||||||
// collect the node names for shard1
|
// collect the node names for shard1
|
||||||
Set<String> nodes = new HashSet<>();
|
Set<String> nodes = new HashSet<>();
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.Row;
|
import org.apache.solr.client.solrj.cloud.autoscaling.Row;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
|
import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
import org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest;
|
import org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
@ -67,7 +68,8 @@ public class TestPolicyCloud extends SimSolrCloudTestCase {
|
||||||
CollectionAdminRequest.createCollection("perReplicaDataColl", "conf", 1, 5)
|
CollectionAdminRequest.createCollection("perReplicaDataColl", "conf", 1, 5)
|
||||||
.process(solrClient);
|
.process(solrClient);
|
||||||
|
|
||||||
waitForState("Timeout waiting for collection to become active", "perReplicaDataColl", clusterShape(1, 5));
|
CloudTestUtils.waitForState(cluster, "Timeout waiting for collection to become active", "perReplicaDataColl",
|
||||||
|
CloudTestUtils.clusterShape(1, 5));
|
||||||
DocCollection coll = getCollectionState("perReplicaDataColl");
|
DocCollection coll = getCollectionState("perReplicaDataColl");
|
||||||
String autoScaleJson = "{" +
|
String autoScaleJson = "{" +
|
||||||
" 'cluster-preferences': [" +
|
" 'cluster-preferences': [" +
|
||||||
|
@ -116,12 +118,13 @@ public class TestPolicyCloud extends SimSolrCloudTestCase {
|
||||||
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1)
|
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1)
|
||||||
.setPolicy("c1")
|
.setPolicy("c1")
|
||||||
.process(solrClient);
|
.process(solrClient);
|
||||||
waitForState("Timeout waiting for collection to become active", collectionName, clusterShape(1, 1));
|
CloudTestUtils.waitForState(cluster, "Timeout waiting for collection to become active", collectionName,
|
||||||
|
CloudTestUtils.clusterShape(1, 1));
|
||||||
|
|
||||||
getCollectionState(collectionName).forEachReplica((s, replica) -> assertEquals(nodeId, replica.getNodeName()));
|
getCollectionState(collectionName).forEachReplica((s, replica) -> assertEquals(nodeId, replica.getNodeName()));
|
||||||
|
|
||||||
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1").process(solrClient);
|
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1").process(solrClient);
|
||||||
waitForState("Timed out waiting to see 2 replicas for collection: " + collectionName,
|
CloudTestUtils.waitForState(cluster, "Timed out waiting to see 2 replicas for collection: " + collectionName,
|
||||||
collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 2);
|
collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 2);
|
||||||
|
|
||||||
getCollectionState(collectionName).forEachReplica((s, replica) -> assertEquals(nodeId, replica.getNodeName()));
|
getCollectionState(collectionName).forEachReplica((s, replica) -> assertEquals(nodeId, replica.getNodeName()));
|
||||||
|
@ -148,7 +151,8 @@ public class TestPolicyCloud extends SimSolrCloudTestCase {
|
||||||
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
|
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
|
||||||
.setPolicy("c1")
|
.setPolicy("c1")
|
||||||
.process(solrClient);
|
.process(solrClient);
|
||||||
waitForState("Timeout waiting for collection to become active", collectionName, clusterShape(1, 2));
|
CloudTestUtils.waitForState(cluster, "Timeout waiting for collection to become active", collectionName,
|
||||||
|
CloudTestUtils.clusterShape(1, 2));
|
||||||
|
|
||||||
DocCollection docCollection = getCollectionState(collectionName);
|
DocCollection docCollection = getCollectionState(collectionName);
|
||||||
List<Replica> list = docCollection.getReplicas(firstNode);
|
List<Replica> list = docCollection.getReplicas(firstNode);
|
||||||
|
@ -161,7 +165,7 @@ public class TestPolicyCloud extends SimSolrCloudTestCase {
|
||||||
|
|
||||||
CollectionAdminRequest.splitShard(collectionName).setShardName("shard1").process(solrClient);
|
CollectionAdminRequest.splitShard(collectionName).setShardName("shard1").process(solrClient);
|
||||||
|
|
||||||
waitForState("Timed out waiting to see 6 replicas for collection: " + collectionName,
|
CloudTestUtils.waitForState(cluster, "Timed out waiting to see 6 replicas for collection: " + collectionName,
|
||||||
collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 6);
|
collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 6);
|
||||||
|
|
||||||
docCollection = getCollectionState(collectionName);
|
docCollection = getCollectionState(collectionName);
|
||||||
|
@ -206,7 +210,8 @@ public class TestPolicyCloud extends SimSolrCloudTestCase {
|
||||||
//org.eclipse.jetty.server.handler.DefaultHandler.2xx-responses
|
//org.eclipse.jetty.server.handler.DefaultHandler.2xx-responses
|
||||||
CollectionAdminRequest.createCollection("metricsTest", "conf", 1, 1)
|
CollectionAdminRequest.createCollection("metricsTest", "conf", 1, 1)
|
||||||
.process(solrClient);
|
.process(solrClient);
|
||||||
waitForState("Timeout waiting for collection to become active", "metricsTest", clusterShape(1, 1));
|
CloudTestUtils.waitForState(cluster, "Timeout waiting for collection to become active", "metricsTest",
|
||||||
|
CloudTestUtils.clusterShape(1, 1));
|
||||||
|
|
||||||
DocCollection collection = getCollectionState("metricsTest");
|
DocCollection collection = getCollectionState("metricsTest");
|
||||||
List<String> tags = Arrays.asList("metrics:solr.node:ADMIN./admin/authorization.clientErrors:count",
|
List<String> tags = Arrays.asList("metrics:solr.node:ADMIN./admin/authorization.clientErrors:count",
|
||||||
|
@ -251,7 +256,8 @@ public class TestPolicyCloud extends SimSolrCloudTestCase {
|
||||||
|
|
||||||
CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "s1", 1, 1, 1)
|
CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "s1", 1, 1, 1)
|
||||||
.process(solrClient);
|
.process(solrClient);
|
||||||
waitForState("Timeout waiting for collection to become active", "policiesTest", clusterShape(1, 3));
|
CloudTestUtils.waitForState(cluster, "Timeout waiting for collection to become active", "policiesTest",
|
||||||
|
CloudTestUtils.clusterShape(1, 3));
|
||||||
|
|
||||||
DocCollection coll = getCollectionState("policiesTest");
|
DocCollection coll = getCollectionState("policiesTest");
|
||||||
|
|
||||||
|
@ -295,14 +301,16 @@ public class TestPolicyCloud extends SimSolrCloudTestCase {
|
||||||
CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "s1,s2", 1)
|
CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "s1,s2", 1)
|
||||||
.setPolicy("c1")
|
.setPolicy("c1")
|
||||||
.process(solrClient);
|
.process(solrClient);
|
||||||
waitForState("Timeout waiting for collection to become active", "policiesTest", clusterShape(2, 1));
|
CloudTestUtils.waitForState(cluster, "Timeout waiting for collection to become active", "policiesTest",
|
||||||
|
CloudTestUtils.clusterShape(2, 1));
|
||||||
|
|
||||||
DocCollection coll = getCollectionState("policiesTest");
|
DocCollection coll = getCollectionState("policiesTest");
|
||||||
assertEquals("c1", coll.getPolicyName());
|
assertEquals("c1", coll.getPolicyName());
|
||||||
assertEquals(2,coll.getReplicas().size());
|
assertEquals(2,coll.getReplicas().size());
|
||||||
coll.forEachReplica((s, replica) -> assertEquals(nodeId, replica.getNodeName()));
|
coll.forEachReplica((s, replica) -> assertEquals(nodeId, replica.getNodeName()));
|
||||||
CollectionAdminRequest.createShard("policiesTest", "s3").process(solrClient);
|
CollectionAdminRequest.createShard("policiesTest", "s3").process(solrClient);
|
||||||
waitForState("Timeout waiting for collection to become active", "policiesTest", clusterShape(3, 1));
|
CloudTestUtils.waitForState(cluster, "Timeout waiting for collection to become active", "policiesTest",
|
||||||
|
CloudTestUtils.clusterShape(3, 1));
|
||||||
|
|
||||||
coll = getCollectionState("policiesTest");
|
coll = getCollectionState("policiesTest");
|
||||||
assertEquals(1, coll.getSlice("s3").getReplicas().size());
|
assertEquals(1, coll.getSlice("s3").getReplicas().size());
|
||||||
|
@ -313,7 +321,8 @@ public class TestPolicyCloud extends SimSolrCloudTestCase {
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "shard1", 2)
|
CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "shard1", 2)
|
||||||
.process(solrClient);
|
.process(solrClient);
|
||||||
waitForState("Timeout waiting for collection to become active", "policiesTest", clusterShape(1, 2));
|
CloudTestUtils.waitForState(cluster, "Timeout waiting for collection to become active", "policiesTest",
|
||||||
|
CloudTestUtils.clusterShape(1, 2));
|
||||||
DocCollection rulesCollection = getCollectionState("policiesTest");
|
DocCollection rulesCollection = getCollectionState("policiesTest");
|
||||||
|
|
||||||
Map<String, Object> val = cluster.getNodeStateProvider().getNodeValues(rulesCollection.getReplicas().get(0).getNodeName(), Arrays.asList(
|
Map<String, Object> val = cluster.getNodeStateProvider().getNodeValues(rulesCollection.getReplicas().get(0).getNodeName(), Arrays.asList(
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
import org.apache.solr.cloud.autoscaling.ActionContext;
|
import org.apache.solr.cloud.autoscaling.ActionContext;
|
||||||
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
||||||
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
||||||
|
@ -109,6 +110,14 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setupTest() throws Exception {
|
public void setupTest() throws Exception {
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
waitForSeconds = 1 + random().nextInt(3);
|
waitForSeconds = 1 + random().nextInt(3);
|
||||||
actionConstructorCalled = new CountDownLatch(1);
|
actionConstructorCalled = new CountDownLatch(1);
|
||||||
|
@ -1140,7 +1149,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
|
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
|
||||||
"conf", 1, 2);
|
"conf", 1, 2);
|
||||||
create.process(solrClient);
|
create.process(solrClient);
|
||||||
waitForState(COLL1, 10, TimeUnit.SECONDS, clusterShape(1, 2));
|
CloudTestUtils.waitForState(cluster, COLL1, 10, TimeUnit.SECONDS, CloudTestUtils.clusterShape(1, 2));
|
||||||
|
|
||||||
String setTriggerCommand = "{" +
|
String setTriggerCommand = "{" +
|
||||||
"'set-trigger' : {" +
|
"'set-trigger' : {" +
|
||||||
|
|
|
@ -21,15 +21,15 @@ package org.apache.solr.client.solrj.cloud.autoscaling;
|
||||||
*/
|
*/
|
||||||
public class AlreadyExistsException extends Exception {
|
public class AlreadyExistsException extends Exception {
|
||||||
|
|
||||||
private final String path;
|
private final String id;
|
||||||
|
|
||||||
public AlreadyExistsException(String path) {
|
public AlreadyExistsException(String id) {
|
||||||
super("Path already exists: " + path);
|
super("Already exists: " + id);
|
||||||
this.path = path;
|
this.id = id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPath() {
|
public String getId() {
|
||||||
return path;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,6 +95,7 @@ public class ZkStateReader implements Closeable {
|
||||||
public static final String PROPERTY_VALUE_PROP = "property.value";
|
public static final String PROPERTY_VALUE_PROP = "property.value";
|
||||||
public static final String MAX_AT_ONCE_PROP = "maxAtOnce";
|
public static final String MAX_AT_ONCE_PROP = "maxAtOnce";
|
||||||
public static final String MAX_WAIT_SECONDS_PROP = "maxWaitSeconds";
|
public static final String MAX_WAIT_SECONDS_PROP = "maxWaitSeconds";
|
||||||
|
public static final String STATE_TIMESTAMP_PROP = "stateTimestamp";
|
||||||
public static final String COLLECTIONS_ZKNODE = "/collections";
|
public static final String COLLECTIONS_ZKNODE = "/collections";
|
||||||
public static final String LIVE_NODES_ZKNODE = "/live_nodes";
|
public static final String LIVE_NODES_ZKNODE = "/live_nodes";
|
||||||
public static final String ALIASES = "/aliases.json";
|
public static final String ALIASES = "/aliases.json";
|
||||||
|
|
|
@ -118,7 +118,8 @@ public interface CollectionParams {
|
||||||
//TODO when we have a node level lock use it here
|
//TODO when we have a node level lock use it here
|
||||||
REPLACENODE(true, LockLevel.NONE),
|
REPLACENODE(true, LockLevel.NONE),
|
||||||
DELETENODE(true, LockLevel.NONE),
|
DELETENODE(true, LockLevel.NONE),
|
||||||
MOCK_REPLICA_TASK(false, LockLevel.REPLICA)
|
MOCK_REPLICA_TASK(false, LockLevel.REPLICA),
|
||||||
|
NONE(false, LockLevel.NONE)
|
||||||
;
|
;
|
||||||
public final boolean isWrite;
|
public final boolean isWrite;
|
||||||
|
|
||||||
|
|
|
@ -25,14 +25,19 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Source of time. NOTE: depending on implementation returned values may not be related in any way to the
|
* Source of time.
|
||||||
* current Epoch or calendar time, and they may even be negative - but the API guarantees that they are
|
* <p>NOTE: depending on implementation returned values may not be related in any way to the
|
||||||
* always monotonically increasing.
|
* current Epoch or calendar time, and they may even be negative - but the API guarantees that
|
||||||
|
* they are always monotonically increasing.</p>
|
||||||
*/
|
*/
|
||||||
public abstract class TimeSource {
|
public abstract class TimeSource {
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
/** Implementation that uses {@link System#currentTimeMillis()}. */
|
/**
|
||||||
|
* Implementation that uses {@link System#currentTimeMillis()}.
|
||||||
|
* This implementation's {@link #getTime()} returns the same values as
|
||||||
|
* {@link #getEpochTime()}.
|
||||||
|
*/
|
||||||
public static final class CurrentTimeSource extends TimeSource {
|
public static final class CurrentTimeSource extends TimeSource {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -41,6 +46,11 @@ public abstract class TimeSource {
|
||||||
return TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
|
return TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getEpochTime() {
|
||||||
|
return getTime();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sleep(long ms) throws InterruptedException {
|
public void sleep(long ms) throws InterruptedException {
|
||||||
Thread.sleep(ms);
|
Thread.sleep(ms);
|
||||||
|
@ -52,14 +62,31 @@ public abstract class TimeSource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Implementation that uses {@link System#nanoTime()}. */
|
/**
|
||||||
|
* Implementation that uses {@link System#nanoTime()}.
|
||||||
|
* Epoch time is initialized using {@link CurrentTimeSource}, and then
|
||||||
|
* calculated as the elapsed number of nanoseconds as measured by this
|
||||||
|
* implementation.
|
||||||
|
*/
|
||||||
public static final class NanoTimeSource extends TimeSource {
|
public static final class NanoTimeSource extends TimeSource {
|
||||||
|
private final long epochStart;
|
||||||
|
private final long nanoStart;
|
||||||
|
|
||||||
|
public NanoTimeSource() {
|
||||||
|
epochStart = CURRENT_TIME.getTime();
|
||||||
|
nanoStart = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getTime() {
|
public long getTime() {
|
||||||
return System.nanoTime();
|
return System.nanoTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getEpochTime() {
|
||||||
|
return epochStart + getTime() - nanoStart;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sleep(long ms) throws InterruptedException {
|
public void sleep(long ms) throws InterruptedException {
|
||||||
Thread.sleep(ms);
|
Thread.sleep(ms);
|
||||||
|
@ -75,24 +102,27 @@ public abstract class TimeSource {
|
||||||
public static final class SimTimeSource extends TimeSource {
|
public static final class SimTimeSource extends TimeSource {
|
||||||
|
|
||||||
final double multiplier;
|
final double multiplier;
|
||||||
long start;
|
final long nanoStart;
|
||||||
|
final long epochStart;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a simulated time source that runs faster than real time by a multipler.
|
* Create a simulated time source that runs faster than real time by a multiplier.
|
||||||
* @param multiplier must be greater than 0.0
|
* @param multiplier must be greater than 0.0
|
||||||
*/
|
*/
|
||||||
public SimTimeSource(double multiplier) {
|
public SimTimeSource(double multiplier) {
|
||||||
this.multiplier = multiplier;
|
this.multiplier = multiplier;
|
||||||
start = NANO_TIME.getTime();
|
epochStart = CURRENT_TIME.getTime();
|
||||||
}
|
nanoStart = NANO_TIME.getTime();
|
||||||
|
|
||||||
public void advance(long delta) {
|
|
||||||
start = getTime() + delta;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getTime() {
|
public long getTime() {
|
||||||
return start + Math.round((double)(NANO_TIME.getTime() - start) * multiplier);
|
return nanoStart + Math.round((double)(NANO_TIME.getTime() - nanoStart) * multiplier);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getEpochTime() {
|
||||||
|
return epochStart + getTime() - nanoStart;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -151,10 +181,18 @@ public abstract class TimeSource {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a time value, in nanosecond unit.
|
* Return a time value, in nanosecond units. Depending on implementation this value may or
|
||||||
|
* may not be related to Epoch time.
|
||||||
*/
|
*/
|
||||||
public abstract long getTime();
|
public abstract long getTime();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return Epoch time. Implementations that are not natively based on epoch time may
|
||||||
|
* return values that are consistently off by a (small) fixed number of milliseconds from
|
||||||
|
* the actual epoch time.
|
||||||
|
*/
|
||||||
|
public abstract long getEpochTime();
|
||||||
|
|
||||||
public abstract void sleep(long ms) throws InterruptedException;
|
public abstract void sleep(long ms) throws InterruptedException;
|
||||||
|
|
||||||
public abstract long convertDelay(TimeUnit fromUnit, long value, TimeUnit toUnit);
|
public abstract long convertDelay(TimeUnit fromUnit, long value, TimeUnit toUnit);
|
||||||
|
|
Loading…
Reference in New Issue