SOLR-10515: Persist intermediate trigger state in ZK.

This commit is contained in:
Andrzej Bialecki 2017-05-25 20:16:01 +02:00
parent b933b60407
commit f0054a30d1
23 changed files with 1015 additions and 247 deletions

View File

@ -20,6 +20,7 @@ import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
import org.apache.solr.util.TimeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,41 +32,29 @@ public class ActionThrottle {
private volatile Long minMsBetweenActions;
private final String name;
private final TimeSource timeSource;
private final NanoTimeSource nanoTimeSource;
public interface NanoTimeSource {
long getTime();
}
private static class DefaultNanoTimeSource implements NanoTimeSource {
@Override
public long getTime() {
return System.nanoTime();
}
}
public ActionThrottle(String name, long minMsBetweenActions) {
this.name = name;
this.minMsBetweenActions = minMsBetweenActions;
this.nanoTimeSource = new DefaultNanoTimeSource();
this.timeSource = TimeSource.NANO_TIME;
}
public ActionThrottle(String name, long minMsBetweenActions, NanoTimeSource nanoTimeSource) {
public ActionThrottle(String name, long minMsBetweenActions, TimeSource timeSource) {
this.name = name;
this.minMsBetweenActions = minMsBetweenActions;
this.nanoTimeSource = nanoTimeSource;
this.timeSource = timeSource;
}
public void markAttemptingAction() {
lastActionStartedAt = nanoTimeSource.getTime();
lastActionStartedAt = timeSource.getTime();
}
public void minimumWaitBetweenActions() {
if (lastActionStartedAt == null) {
return;
}
long diff = nanoTimeSource.getTime() - lastActionStartedAt;
long diff = timeSource.getTime() - lastActionStartedAt;
int diffMs = (int) TimeUnit.MILLISECONDS.convert(diff, TimeUnit.NANOSECONDS);
long minNsBetweenActions = TimeUnit.NANOSECONDS.convert(minMsBetweenActions, TimeUnit.MILLISECONDS);
log.info("The last {} attempt started {}ms ago.", name, diffMs);

View File

@ -664,6 +664,8 @@ public class ZkController {
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, zkClient);
byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8);
cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);

View File

@ -48,24 +48,15 @@ public class AutoScaling {
AFTER_ACTION
}
public static interface TriggerEvent<T extends Trigger> {
public T getSource();
public long getEventNanoTime();
public void setContext(Map<String, Object> context);
public Map<String, Object> getContext();
}
public static interface TriggerListener<E extends TriggerEvent<? extends Trigger>> {
public interface TriggerListener {
/**
* This method is executed when a trigger is ready to fire.
*
* @param event a subclass of {@link TriggerEvent}
* @return true if the listener was ready to perform actions on the event, false otherwise.
* @return true if the listener was ready to perform actions on the event, false
* otherwise. If false was returned then callers should assume the event was discarded.
*/
public boolean triggerFired(E event);
boolean triggerFired(TriggerEvent event);
}
public static class HttpCallbackListener implements TriggerListener {
@ -78,7 +69,7 @@ public class AutoScaling {
/**
* Interface for a Solr trigger. Each trigger implements Runnable and Closeable interface. A trigger
* is scheduled using a {@link java.util.concurrent.ScheduledExecutorService} so it is executed as
* per a configured schedule to check whether the trigger is ready to fire. The {@link #setListener(TriggerListener)}
* per a configured schedule to check whether the trigger is ready to fire. The {@link Trigger#setListener(TriggerListener)}
* method should be used to set a callback listener which is fired by implementation of this class whenever
* ready.
* <p>
@ -92,29 +83,47 @@ public class AutoScaling {
* with the proper trigger event object. If that method returns false then it should be interpreted to mean
* that Solr is not ready to process this trigger event and therefore we should retain the state and fire
* at the next invocation of the run() method.
*
* @param <E> the {@link TriggerEvent} which is handled by this Trigger
*/
public static interface Trigger<E extends TriggerEvent<? extends Trigger>> extends Closeable, Runnable {
public String getName();
public interface Trigger extends Closeable, Runnable {
/**
* Trigger name.
*/
String getName();
public EventType getEventType();
/**
* Event type generated by this trigger.
*/
EventType getEventType();
public boolean isEnabled();
/** Returns true if this trigger is enabled. */
boolean isEnabled();
public Map<String, Object> getProperties();
/** Trigger properties. */
Map<String, Object> getProperties();
public int getWaitForSecond();
/** Number of seconds to wait between fired events ("waitFor" property). */
int getWaitForSecond();
public List<TriggerAction> getActions();
/** Actions to execute when event is fired. */
List<TriggerAction> getActions();
public void setListener(TriggerListener<E> listener);
/** Set event listener to call when event is fired. */
void setListener(TriggerListener listener);
public TriggerListener<E> getListener();
/** Get event listener. */
TriggerListener getListener();
public boolean isClosed();
/** Return true when this trigger is closed and cannot be used. */
boolean isClosed();
public void restoreState(Trigger<E> old);
/** Set internal state of this trigger from another instance. */
void restoreState(Trigger old);
/** Save internal state of this trigger in ZooKeeper. */
void saveState();
/** Restore internal state of this trigger from ZooKeeper. */
void restoreState();
/**
* Called before a trigger is scheduled. Any heavy object creation or initialisation should

View File

@ -39,7 +39,6 @@ import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.RequestHandlerBase;
@ -49,6 +48,7 @@ import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.security.AuthorizationContext;
import org.apache.solr.security.PermissionNameProvider;
import org.apache.solr.util.CommandOperation;
import org.apache.solr.util.TimeSource;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@ -66,6 +66,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
protected final CoreContainer container;
private final List<Map<String, String>> DEFAULT_ACTIONS = new ArrayList<>(3);
private static ImmutableSet<String> singletonCommands = ImmutableSet.of("set-cluster-preferences", "set-cluster-policy");
private static final TimeSource timeSource = TimeSource.CURRENT_TIME;
public AutoScalingHandler(CoreContainer container) {
@ -250,7 +251,6 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
rsp.getValues().add("result", "success");
}
@SuppressForbidden(reason = "currentTimeMillis is used to find the resume time for the trigger")
private void handleSuspendTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
String triggerName = op.getStr("name");
@ -263,7 +263,8 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
if (timeout != null) {
try {
int timeoutSeconds = parseHumanTime(timeout);
resumeTime = new Date(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeoutSeconds, TimeUnit.SECONDS));
resumeTime = new Date(TimeUnit.MILLISECONDS.convert(timeSource.getTime(), TimeUnit.NANOSECONDS)
+ TimeUnit.MILLISECONDS.convert(timeoutSeconds, TimeUnit.SECONDS));
} catch (IllegalArgumentException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid 'timeout' value for suspend trigger: " + triggerName);
}

View File

@ -45,7 +45,7 @@ public class ComputePlanAction implements TriggerAction {
}
@Override
public void process(AutoScaling.TriggerEvent event) {
public void process(TriggerEvent event) {
}
}

View File

@ -45,7 +45,7 @@ public class ExecutePlanAction implements TriggerAction {
}
@Override
public void process(AutoScaling.TriggerEvent event) {
public void process(TriggerEvent event) {
}
}

View File

@ -45,7 +45,7 @@ public class LogPlanAction implements TriggerAction {
}
@Override
public void process(AutoScaling.TriggerEvent event) {
public void process(TriggerEvent event) {
}
}

View File

@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -35,23 +36,25 @@ import org.apache.lucene.util.IOUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.TimeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Trigger for the {@link org.apache.solr.cloud.autoscaling.AutoScaling.EventType#NODEADDED} event
*/
public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.NodeAddedEvent> {
public class NodeAddedTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String name;
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
private final AtomicReference<AutoScaling.TriggerListener<NodeAddedEvent>> listenerRef;
private final AtomicReference<AutoScaling.TriggerListener> listenerRef;
private final boolean enabled;
private final int waitForSecond;
private final AutoScaling.EventType eventType;
private final TimeSource timeSource;
private boolean isClosed = false;
@ -61,9 +64,11 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
public NodeAddedTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
super(container.getZkController().getZkClient());
this.name = name;
this.properties = properties;
this.container = container;
this.timeSource = TimeSource.CURRENT_TIME;
this.listenerRef = new AtomicReference<>();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
@ -75,6 +80,8 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
} else {
actions = Collections.emptyList();
}
lastLiveNodes = new HashSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = (boolean) properties.getOrDefault("enabled", true);
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
this.eventType = AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
@ -93,12 +100,12 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
}
@Override
public void setListener(AutoScaling.TriggerListener<NodeAddedEvent> listener) {
public void setListener(AutoScaling.TriggerListener listener) {
listenerRef.set(listener);
}
@Override
public AutoScaling.TriggerListener<NodeAddedEvent> getListener() {
public AutoScaling.TriggerListener getListener() {
return listenerRef.get();
}
@ -156,7 +163,7 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
}
@Override
public void restoreState(AutoScaling.Trigger<NodeAddedEvent> old) {
public void restoreState(AutoScaling.Trigger old) {
assert old.isClosed();
if (old instanceof NodeAddedTrigger) {
NodeAddedTrigger that = (NodeAddedTrigger) old;
@ -169,6 +176,28 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
}
}
@Override
protected Map<String, Object> getState() {
Map<String,Object> state = new HashMap<>();
state.put("lastLiveNodes", lastLiveNodes);
state.put("nodeNameVsTimeAdded", nodeNameVsTimeAdded);
return state;
}
@Override
protected void setState(Map<String, Object> state) {
this.lastLiveNodes.clear();
this.nodeNameVsTimeAdded.clear();
Collection<String> lastLiveNodes = (Collection<String>)state.get("lastLiveNodes");
if (lastLiveNodes != null) {
this.lastLiveNodes.addAll(lastLiveNodes);
}
Map<String,Long> nodeNameVsTimeAdded = (Map<String,Long>)state.get("nodeNameVsTimeAdded");
if (nodeNameVsTimeAdded != null) {
this.nodeNameVsTimeAdded.putAll(nodeNameVsTimeAdded);
}
}
@Override
public void run() {
try {
@ -183,10 +212,6 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
ZkStateReader reader = container.getZkController().getZkStateReader();
Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
log.debug("Found livenodes: {}", newLiveNodes);
if (lastLiveNodes == null) {
lastLiveNodes = newLiveNodes;
return;
}
// have any nodes that we were tracking been removed from the cluster?
// if so, remove them from the tracking map
@ -197,22 +222,22 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
Set<String> copyOfNew = new HashSet<>(newLiveNodes);
copyOfNew.removeAll(lastLiveNodes);
copyOfNew.forEach(n -> {
long nanoTime = System.nanoTime();
nodeNameVsTimeAdded.put(n, nanoTime);
log.info("Tracking new node: {} at {} nanotime", n, nanoTime);
long eventTime = timeSource.getTime();
nodeNameVsTimeAdded.put(n, eventTime);
log.debug("Tracking new node: {} at time {}", n, eventTime);
});
// has enough time expired to trigger events for a node?
for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
String nodeName = entry.getKey();
Long timeAdded = entry.getValue();
long now = System.nanoTime();
long now = timeSource.getTime();
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
// fire!
AutoScaling.TriggerListener<NodeAddedEvent> listener = listenerRef.get();
AutoScaling.TriggerListener listener = listenerRef.get();
if (listener != null) {
log.info("NodeAddedTrigger {} firing registered listener for node: {} added at {} nanotime, now: {} nanotime", name, nodeName, timeAdded, now);
if (listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName))) {
log.debug("NodeAddedTrigger {} firing registered listener for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
// remove from tracking set only if the fire was accepted
trackingKeySet.remove(nodeName);
}
@ -222,7 +247,7 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
}
}
lastLiveNodes = newLiveNodes;
lastLiveNodes = new HashSet(newLiveNodes);
} catch (RuntimeException e) {
log.error("Unexpected exception in NodeAddedTrigger", e);
}
@ -235,45 +260,10 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
}
}
public static class NodeAddedEvent implements AutoScaling.TriggerEvent<NodeAddedTrigger> {
private final NodeAddedTrigger source;
private final long nodeAddedNanoTime;
private final String nodeName;
public static class NodeAddedEvent extends TriggerEvent {
private Map<String, Object> context;
public NodeAddedEvent(NodeAddedTrigger source, long nodeAddedNanoTime, String nodeAdded) {
this.source = source;
this.nodeAddedNanoTime = nodeAddedNanoTime;
this.nodeName = nodeAdded;
}
@Override
public NodeAddedTrigger getSource() {
return source;
}
@Override
public long getEventNanoTime() {
return nodeAddedNanoTime;
}
public String getNodeName() {
return nodeName;
}
public AutoScaling.EventType getType() {
return source.getEventType();
}
@Override
public void setContext(Map<String, Object> context) {
this.context = context;
}
@Override
public Map<String, Object> getContext() {
return context;
public NodeAddedEvent(AutoScaling.EventType eventType, String source, long nodeAddedTime, String nodeAdded) {
super(eventType, source, nodeAddedTime, Collections.singletonMap(NODE_NAME, nodeAdded));
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -35,23 +36,25 @@ import org.apache.lucene.util.IOUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.TimeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Trigger for the {@link AutoScaling.EventType#NODELOST} event
*/
public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.NodeLostEvent> {
public class NodeLostTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String name;
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
private final AtomicReference<AutoScaling.TriggerListener<NodeLostEvent>> listenerRef;
private final AtomicReference<AutoScaling.TriggerListener> listenerRef;
private final boolean enabled;
private final int waitForSecond;
private final AutoScaling.EventType eventType;
private final TimeSource timeSource;
private boolean isClosed = false;
@ -61,9 +64,11 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
public NodeLostTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
super(container.getZkController().getZkClient());
this.name = name;
this.properties = properties;
this.container = container;
this.timeSource = TimeSource.CURRENT_TIME;
this.listenerRef = new AtomicReference<>();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
@ -75,7 +80,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
} else {
actions = Collections.emptyList();
}
lastLiveNodes = container.getZkController().getZkStateReader().getClusterState().getLiveNodes();
lastLiveNodes = new HashSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = (boolean) properties.getOrDefault("enabled", true);
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
@ -94,12 +99,12 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
}
@Override
public void setListener(AutoScaling.TriggerListener<NodeLostEvent> listener) {
public void setListener(AutoScaling.TriggerListener listener) {
listenerRef.set(listener);
}
@Override
public AutoScaling.TriggerListener<NodeLostEvent> getListener() {
public AutoScaling.TriggerListener getListener() {
return listenerRef.get();
}
@ -157,7 +162,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
}
@Override
public void restoreState(AutoScaling.Trigger<NodeLostEvent> old) {
public void restoreState(AutoScaling.Trigger old) {
assert old.isClosed();
if (old instanceof NodeLostTrigger) {
NodeLostTrigger that = (NodeLostTrigger) old;
@ -170,6 +175,28 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
}
}
@Override
protected Map<String, Object> getState() {
Map<String,Object> state = new HashMap<>();
state.put("lastLiveNodes", lastLiveNodes);
state.put("nodeNameVsTimeRemoved", nodeNameVsTimeRemoved);
return state;
}
@Override
protected void setState(Map<String, Object> state) {
this.lastLiveNodes.clear();
this.nodeNameVsTimeRemoved.clear();
Collection<String> lastLiveNodes = (Collection<String>)state.get("lastLiveNodes");
if (lastLiveNodes != null) {
this.lastLiveNodes.addAll(lastLiveNodes);
}
Map<String,Long> nodeNameVsTimeRemoved = (Map<String,Long>)state.get("nodeNameVsTimeRemoved");
if (nodeNameVsTimeRemoved != null) {
this.nodeNameVsTimeRemoved.putAll(nodeNameVsTimeRemoved);
}
}
@Override
public void run() {
try {
@ -194,20 +221,20 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
Set<String> copyOfLastLiveNodes = new HashSet<>(lastLiveNodes);
copyOfLastLiveNodes.removeAll(newLiveNodes);
copyOfLastLiveNodes.forEach(n -> {
log.info("Tracking lost node: {}", n);
nodeNameVsTimeRemoved.put(n, System.nanoTime());
log.debug("Tracking lost node: {}", n);
nodeNameVsTimeRemoved.put(n, timeSource.getTime());
});
// has enough time expired to trigger events for a node?
for (Map.Entry<String, Long> entry : nodeNameVsTimeRemoved.entrySet()) {
String nodeName = entry.getKey();
Long timeRemoved = entry.getValue();
if (TimeUnit.SECONDS.convert(System.nanoTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
// fire!
AutoScaling.TriggerListener<NodeLostEvent> listener = listenerRef.get();
AutoScaling.TriggerListener listener = listenerRef.get();
if (listener != null) {
log.info("NodeLostTrigger firing registered listener");
if (listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName))) {
log.debug("NodeLostTrigger firing registered listener");
if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) {
trackingKeySet.remove(nodeName);
}
} else {
@ -216,7 +243,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
}
}
lastLiveNodes = newLiveNodes;
lastLiveNodes = new HashSet<>(newLiveNodes);
} catch (RuntimeException e) {
log.error("Unexpected exception in NodeLostTrigger", e);
}
@ -229,45 +256,10 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
}
}
public static class NodeLostEvent implements AutoScaling.TriggerEvent<NodeLostTrigger> {
private final NodeLostTrigger source;
private final long nodeLostNanoTime;
private final String nodeName;
public static class NodeLostEvent extends TriggerEvent {
private Map<String, Object> context;
public NodeLostEvent(NodeLostTrigger source, long nodeLostNanoTime, String nodeRemoved) {
this.source = source;
this.nodeLostNanoTime = nodeLostNanoTime;
this.nodeName = nodeRemoved;
}
@Override
public NodeLostTrigger getSource() {
return source;
}
@Override
public long getEventNanoTime() {
return nodeLostNanoTime;
}
public String getNodeName() {
return nodeName;
}
public AutoScaling.EventType getType() {
return source.getEventType();
}
@Override
public void setContext(Map<String, Object> context) {
this.context = context;
}
@Override
public Map<String, Object> getContext() {
return context;
public NodeLostEvent(AutoScaling.EventType eventType, String source, long nodeLostTime, String nodeRemoved) {
super(eventType, source, nodeLostTime, Collections.singletonMap(NODE_NAME, nodeRemoved));
}
}
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@ -77,7 +78,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
this.zkController = zkController;
zkStateReader = zkController.getZkStateReader();
zkClient = zkController.getZkClient();
scheduledTriggers = new ScheduledTriggers();
scheduledTriggers = new ScheduledTriggers(zkClient);
triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer());
}

View File

@ -21,11 +21,11 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
@ -35,9 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,7 +54,7 @@ public class ScheduledTriggers implements Closeable {
static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
static final int DEFAULT_MIN_MS_BETWEEN_ACTIONS = 5000;
private final Map<String, ScheduledTrigger> scheduledTriggers = new HashMap<>();
private final Map<String, ScheduledTrigger> scheduledTriggers = new ConcurrentHashMap<>();
/**
* Thread pool for scheduling the triggers
@ -70,7 +74,11 @@ public class ScheduledTriggers implements Closeable {
private final ActionThrottle actionThrottle;
public ScheduledTriggers() {
private final SolrZkClient zkClient;
private final Overseer.Stats queueStats;
public ScheduledTriggers(SolrZkClient zkClient) {
// todo make the core pool size configurable
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
// ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand
@ -83,6 +91,8 @@ public class ScheduledTriggers implements Closeable {
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
// todo make the wait time configurable
actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
this.zkClient = zkClient;
queueStats = new Overseer.Stats();
}
/**
@ -97,7 +107,7 @@ public class ScheduledTriggers implements Closeable {
if (isClosed) {
throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
}
ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger);
ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger, zkClient, queueStats);
ScheduledTrigger old = scheduledTriggers.putIfAbsent(newTrigger.getName(), scheduledTrigger);
if (old != null) {
if (old.trigger.equals(newTrigger)) {
@ -106,20 +116,34 @@ public class ScheduledTriggers implements Closeable {
}
IOUtils.closeQuietly(old);
newTrigger.restoreState(old.trigger);
scheduledTrigger.setReplay(false);
scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
}
newTrigger.setListener(event -> {
AutoScaling.Trigger source = event.getSource();
ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
if (scheduledSource == null) {
log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + event.getSource() + " doesn't exist.");
return false;
}
boolean replaying = event.getProperty(TriggerEvent.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEvent.REPLAYING) : false;
AutoScaling.Trigger source = scheduledSource.trigger;
if (source.isClosed()) {
log.warn("Ignoring autoscaling event because the source trigger: " + source + " has already been closed");
// we do not want to lose this event just because the trigger were closed, perhaps a replacement will need it
log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + source + " has already been closed");
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
return false;
}
if (hasPendingActions.compareAndSet(false, true)) {
final boolean enqueued;
if (replaying) {
enqueued = false;
} else {
enqueued = scheduledTrigger.enqueue(event);
}
List<TriggerAction> actions = source.getActions();
if (actions != null) {
actionExecutor.submit(() -> {
assert hasPendingActions.get();
log.debug("-- processing actions for " + event);
try {
// let the action executor thread wait instead of the trigger thread so we use the throttle here
actionThrottle.minimumWaitBetweenActions();
@ -132,10 +156,23 @@ public class ScheduledTriggers implements Closeable {
throw e;
}
}
if (enqueued) {
TriggerEvent ev = scheduledTrigger.dequeue();
assert ev.getId().equals(event.getId());
}
} finally {
hasPendingActions.set(false);
}
});
} else {
if (enqueued) {
TriggerEvent ev = scheduledTrigger.dequeue();
if (!ev.getId().equals(event.getId())) {
throw new RuntimeException("Wrong event dequeued, queue of " + scheduledTrigger.trigger.getName()
+ " is broken! Expected event=" + event + " but got " + ev);
}
}
hasPendingActions.set(false);
}
return true;
} else {
@ -148,19 +185,37 @@ public class ScheduledTriggers implements Closeable {
}
/**
* Removes and stops the trigger with the given name
* Removes and stops the trigger with the given name. Also cleans up any leftover
* state / events in ZK.
*
* @param triggerName the name of the trigger to be removed
* @throws AlreadyClosedException if this class has already been closed
*/
public synchronized void remove(String triggerName) {
if (isClosed) {
throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used any more");
}
ScheduledTrigger removed = scheduledTriggers.remove(triggerName);
IOUtils.closeQuietly(removed);
removeTriggerZKData(triggerName);
}
private void removeTriggerZKData(String triggerName) {
String statePath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName;
String eventsPath = ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName;
try {
if (zkClient.exists(statePath, true)) {
zkClient.delete(statePath, -1, true);
}
} catch (KeeperException | InterruptedException e) {
log.warn("Failed to remove state for removed trigger " + statePath, e);
}
try {
if (zkClient.exists(eventsPath, true)) {
zkClient.delete(eventsPath, -1, true);
}
} catch (KeeperException | InterruptedException e) {
log.warn("Failed to remove events for removed trigger " + eventsPath, e);
}
}
/**
* @return an unmodifiable set of names of all triggers being managed by this class
*/
@ -178,36 +233,91 @@ public class ScheduledTriggers implements Closeable {
}
scheduledTriggers.clear();
}
ExecutorUtil.shutdownAndAwaitTermination(scheduledThreadPoolExecutor);
ExecutorUtil.shutdownAndAwaitTermination(actionExecutor);
// shutdown and interrupt all running tasks because there's no longer any
// guarantee about cluster state
scheduledThreadPoolExecutor.shutdownNow();
actionExecutor.shutdownNow();
}
private class ScheduledTrigger implements Runnable, Closeable {
AutoScaling.Trigger trigger;
ScheduledFuture<?> scheduledFuture;
TriggerEventQueue queue;
boolean replay;
volatile boolean isClosed;
ScheduledTrigger(AutoScaling.Trigger trigger) {
ScheduledTrigger(AutoScaling.Trigger trigger, SolrZkClient zkClient, Overseer.Stats stats) {
this.trigger = trigger;
this.queue = new TriggerEventQueue(zkClient, trigger.getName(), stats);
this.replay = true;
this.isClosed = false;
}
public void setReplay(boolean replay) {
this.replay = replay;
}
public boolean enqueue(TriggerEvent event) {
if (isClosed) {
throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
}
return queue.offerEvent(event);
}
public TriggerEvent dequeue() {
if (isClosed) {
throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
}
TriggerEvent event = queue.pollEvent();
return event;
}
@Override
public void run() {
if (isClosed) {
throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
}
// fire a trigger only if an action is not pending
// note this is not fool proof e.g. it does not prevent an action being executed while a trigger
// is still executing. There is additional protection against that scenario in the event listener.
if (!hasPendingActions.get()) {
// replay accumulated events on first run, if any
if (replay) {
TriggerEvent event;
// peek first without removing - we may crash before calling the listener
while ((event = queue.peekEvent()) != null) {
// override REPLAYING=true
event.getProperties().put(TriggerEvent.REPLAYING, true);
if (! trigger.getListener().triggerFired(event)) {
log.error("Failed to re-play event, discarding: " + event);
}
queue.pollEvent(); // always remove it from queue
}
// now restore saved state to possibly generate new events from old state on the first run
try {
trigger.restoreState();
} catch (Exception e) {
// log but don't throw - see below
log.error("Error restoring trigger state " + trigger.getName(), e);
}
replay = false;
}
try {
trigger.run();
} catch (Exception e) {
// log but do not propagate exception because an exception thrown from a scheduled operation
// will suppress future executions
log.error("Unexpected execution from trigger: " + trigger.getName(), e);
} finally {
// checkpoint after each run
trigger.saveState();
}
}
}
@Override
public void close() throws IOException {
isClosed = true;
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}

View File

@ -30,5 +30,5 @@ public interface TriggerAction extends MapInitializedPlugin, Closeable {
public String getClassName();
public void process(AutoScaling.TriggerEvent event);
public void process(TriggerEvent event);
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Map;
import java.util.TreeMap;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} implementations.
* It handles state snapshot / restore in ZK.
*/
public abstract class TriggerBase implements AutoScaling.Trigger {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected SolrZkClient zkClient;
protected Map<String,Object> lastState;
protected TriggerBase(SolrZkClient zkClient) {
this.zkClient = zkClient;
try {
zkClient.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, false, true);
} catch (KeeperException | InterruptedException e) {
LOG.warn("Exception checking ZK path " + ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, e);
}
}
/**
* Prepare and return internal state of this trigger in a format suitable for persisting in ZK.
* @return map of internal state properties. Note: values must be supported by {@link Utils#toJSON(Object)}.
*/
protected abstract Map<String,Object> getState();
/**
* Restore internal state of this trigger from properties retrieved from ZK.
* @param state never null but may be empty.
*/
protected abstract void setState(Map<String,Object> state);
@Override
public void saveState() {
Map<String,Object> state = Utils.getDeepCopy(getState(), 10, false, true);
if (lastState != null && lastState.equals(state)) {
// skip saving if identical
return;
}
byte[] data = Utils.toJSON(state);
String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName();
try {
if (zkClient.exists(path, true)) {
// update
zkClient.setData(path, data, -1, true);
} else {
// create
zkClient.create(path, data, CreateMode.PERSISTENT, true);
}
lastState = state;
} catch (KeeperException | InterruptedException e) {
LOG.warn("Exception updating trigger state '" + path + "'", e);
}
}
@Override
public void restoreState() {
byte[] data = null;
String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName();
try {
if (zkClient.exists(path, true)) {
data = zkClient.getData(path, null, new Stat(), true);
}
} catch (KeeperException | InterruptedException e) {
LOG.warn("Exception getting trigger state '" + path + "'", e);
}
if (data != null) {
Map<String, Object> state = (Map<String, Object>)Utils.fromJSON(data);
// make sure lastState is sorted
state = Utils.getDeepCopy(state, 10, false, true);;
setState(state);
lastState = state;
}
}
}

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.solr.common.MapWriter;
import org.apache.solr.util.IdUtils;
/**
* Trigger event.
*/
public class TriggerEvent implements MapWriter {
public static final String REPLAYING = "replaying";
public static final String NODE_NAME = "nodeName";
protected final String id;
protected final String source;
protected final long eventTime;
protected final AutoScaling.EventType eventType;
protected final Map<String, Object> properties = new HashMap<>();
public TriggerEvent(AutoScaling.EventType eventType, String source, long eventTime,
Map<String, Object> properties) {
this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties);
}
public TriggerEvent(String id, AutoScaling.EventType eventType, String source, long eventTime,
Map<String, Object> properties) {
this.id = id;
this.eventType = eventType;
this.source = source;
this.eventTime = eventTime;
if (properties != null) {
this.properties.putAll(properties);
}
}
/**
* Unique event id.
*/
public String getId() {
return id;
}
/**
* Name of the trigger that fired the event.
*/
public String getSource() {
return source;
}
/**
* Timestamp of the actual event, in nanoseconds.
* NOTE: this is NOT the timestamp when the event was fired - events may be fired
* much later than the actual condition that generated the event, due to the "waitFor" limit.
*/
public long getEventTime() {
return eventTime;
}
/**
* Get event properties (modifiable).
*/
public Map<String, Object> getProperties() {
return properties;
}
/**
* Get a named event property or null if missing.
*/
public Object getProperty(String name) {
return properties.get(name);
}
/**
* Event type.
*/
public AutoScaling.EventType getEventType() {
return eventType;
}
/**
* Set event properties.
*
* @param properties may be null. A shallow copy of this parameter is used.
*/
public void setProperties(Map<String, Object> properties) {
this.properties.clear();
if (properties != null) {
this.properties.putAll(properties);
}
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put("id", id);
ew.put("source", source);
ew.put("eventTime", eventTime);
ew.put("eventType", eventType.toString());
ew.put("properties", properties);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TriggerEvent that = (TriggerEvent) o;
if (eventTime != that.eventTime) return false;
if (!id.equals(that.id)) return false;
if (!source.equals(that.source)) return false;
if (eventType != that.eventType) return false;
return properties.equals(that.properties);
}
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + source.hashCode();
result = 31 * result + (int) (eventTime ^ (eventTime >>> 32));
result = 31 * result + eventType.hashCode();
result = 31 * result + properties.hashCode();
return result;
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "{" +
"id='" + id + '\'' +
", source='" + source + '\'' +
", eventTime=" + eventTime +
", properties=" + properties +
'}';
}
}

View File

@ -0,0 +1,100 @@
package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TimeSource;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class TriggerEventQueue extends DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String ENQUEUE_TIME = "_enqueue_time_";
public static final String DEQUEUE_TIME = "_dequeue_time_";
private final String triggerName;
private final TimeSource timeSource;
public TriggerEventQueue(SolrZkClient zookeeper, String triggerName, Overseer.Stats stats) {
super(zookeeper, ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName, stats);
this.triggerName = triggerName;
this.timeSource = TimeSource.CURRENT_TIME;
}
public boolean offerEvent(TriggerEvent event) {
event.getProperties().put(ENQUEUE_TIME, timeSource.getTime());
try {
byte[] data = Utils.toJSON(event);
offer(data);
return true;
} catch (KeeperException | InterruptedException e) {
LOG.warn("Exception adding event " + event + " to queue " + triggerName, e);
return false;
}
}
public TriggerEvent peekEvent() {
byte[] data;
try {
while ((data = peek()) != null) {
if (data.length == 0) {
LOG.warn("ignoring empty data...");
continue;
}
try {
Map<String, Object> map = (Map<String, Object>) Utils.fromJSON(data);
return fromMap(map);
} catch (Exception e) {
LOG.warn("Invalid event data, ignoring: " + new String(data));
continue;
}
}
} catch (KeeperException | InterruptedException e) {
LOG.warn("Exception peeking queue of trigger " + triggerName, e);
}
return null;
}
public TriggerEvent pollEvent() {
byte[] data;
try {
while ((data = poll()) != null) {
if (data.length == 0) {
LOG.warn("ignoring empty data...");
continue;
}
try {
Map<String, Object> map = (Map<String, Object>) Utils.fromJSON(data);
return fromMap(map);
} catch (Exception e) {
LOG.warn("Invalid event data, ignoring: " + new String(data));
continue;
}
}
} catch (KeeperException | InterruptedException e) {
LOG.warn("Exception polling queue of trigger " + triggerName, e);
}
return null;
}
private TriggerEvent fromMap(Map<String, Object> map) {
String id = (String)map.get("id");
String source = (String)map.get("source");
long eventTime = ((Number)map.get("eventTime")).longValue();
AutoScaling.EventType eventType = AutoScaling.EventType.valueOf((String)map.get("eventType"));
Map<String, Object> properties = (Map<String, Object>)map.get("properties");
TriggerEvent res = new TriggerEvent(id, eventType, source, eventTime, properties);
res.getProperties().put(DEQUEUE_TIME, timeSource.getTime());
return res;
}
}

View File

@ -0,0 +1,39 @@
package org.apache.solr.util;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.StringHelper;
/**
* Helper class for generating unique ID-s.
*/
public class IdUtils {
/**
* Generate a short random id (see {@link StringHelper#randomId()}).
*/
public static final String randomId() {
return StringHelper.idToString(StringHelper.randomId());
}
/**
* Generate a random id with a timestamp, in the format:
* <code>hex(timestamp) + 'T' + randomId</code>. This method
* uses {@link TimeSource#CURRENT_TIME} for timestamp values.
*/
public static final String timeRandomId() {
return timeRandomId(TimeUnit.MILLISECONDS.convert(TimeSource.CURRENT_TIME.getTime(), TimeUnit.NANOSECONDS));
}
/**
* Generate a random id with a timestamp, in the format:
* <code>hex(timestamp) + 'T' + randomId</code>.
* @param time value representing timestamp
*/
public static final String timeRandomId(long time) {
StringBuilder sb = new StringBuilder(Long.toHexString(time));
sb.append('T');
sb.append(randomId());
return sb.toString();
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.util;
import java.util.concurrent.TimeUnit;
import org.apache.solr.common.util.SuppressForbidden;
/**
* Source of timestamps.
*/
public abstract class TimeSource {
/** Implementation that uses {@link System#currentTimeMillis()}. */
public static final class CurrentTimeSource extends TimeSource {
@Override
@SuppressForbidden(reason = "Needed to provide timestamps based on currentTimeMillis.")
public long getTime() {
return TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}
/** Implementation that uses {@link System#nanoTime()}. */
public static final class NanoTimeSource extends TimeSource {
@Override
public long getTime() {
return System.nanoTime();
}
}
/** This instance uses {@link CurrentTimeSource} for generating timestamps. */
public static final TimeSource CURRENT_TIME = new CurrentTimeSource();
/** This instance uses {@link NanoTimeSource} for generating timestamps. */
public static final TimeSource NANO_TIME = new NanoTimeSource();
/**
* Return a timestamp, in nanosecond unit.
*/
public abstract long getTime();
}

View File

@ -21,12 +21,12 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.ActionThrottle.NanoTimeSource;
import org.apache.solr.util.TimeSource;
import org.junit.Test;
public class ActionThrottleTest extends SolrTestCaseJ4 {
static class TestNanoTimeSource implements NanoTimeSource {
static class TestNanoTimeSource extends TimeSource {
private List<Long> returnValues;
private int index = 0;
@ -41,35 +41,38 @@ public class ActionThrottleTest extends SolrTestCaseJ4 {
}
}
// use the same time source as ActionThrottle
private static final TimeSource timeSource = TimeSource.NANO_TIME;
@Test
public void testBasics() throws Exception {
ActionThrottle at = new ActionThrottle("test", 1000);
long start = System.nanoTime();
long start = timeSource.getTime();
at.minimumWaitBetweenActions();
// should be no wait
assertTrue(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS) < 1000);
assertTrue(TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS) < 1000);
at.markAttemptingAction();
if (random().nextBoolean()) Thread.sleep(100);
at.minimumWaitBetweenActions();
long elaspsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
long elaspsedTime = TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS);
assertTrue(elaspsedTime + "ms", elaspsedTime >= 995);
start = System.nanoTime();
start = timeSource.getTime();
at.markAttemptingAction();
at.minimumWaitBetweenActions();
Thread.sleep(random().nextInt(1000));
elaspsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
elaspsedTime = TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS);
assertTrue(elaspsedTime + "ms", elaspsedTime >= 995);
}
@ -78,13 +81,13 @@ public class ActionThrottleTest extends SolrTestCaseJ4 {
public void testAZeroNanoTimeReturnInWait() throws Exception {
ActionThrottle at = new ActionThrottle("test", 1000, new TestNanoTimeSource(Arrays.asList(new Long[]{0L, 10L})));
long start = System.nanoTime();
long start = timeSource.getTime();
at.markAttemptingAction();
at.minimumWaitBetweenActions();
long elaspsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
long elaspsedTime = TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS);
assertTrue(elaspsedTime + "ms", elaspsedTime >= 995);

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.TimeSource;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -42,11 +43,13 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
private AutoScaling.TriggerListener<NodeAddedTrigger.NodeAddedEvent> noFirstRunListener = event -> {
private AutoScaling.TriggerListener noFirstRunListener = event -> {
fail("Did not expect the listener to fire on first run!");
return true;
};
private static final TimeSource timeSource = TimeSource.CURRENT_TIME;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(1)
@ -73,11 +76,11 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
JettySolrRunner newNode = cluster.startJettySolrRunner();
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<NodeAddedTrigger.NodeAddedEvent> eventRef = new AtomicReference<>();
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
trigger.setListener(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
fail("NodeAddedListener was fired before the configured waitFor period");
}
} else {
@ -94,9 +97,9 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
}
} while (!fired.get());
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
TriggerEvent nodeAddedEvent = eventRef.get();
assertNotNull(nodeAddedEvent);
assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName());
assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
}
// add a new node but remove it before the waitFor period expires
@ -111,7 +114,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setListener(event -> {
if (fired.compareAndSet(false, true)) {
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
fail("NodeAddedListener was fired before the configured waitFor period");
}
} else {
@ -170,7 +173,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
}
@Override
public void process(AutoScaling.TriggerEvent event) {
public void process(TriggerEvent event) {
}
@ -246,11 +249,11 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<NodeAddedTrigger.NodeAddedEvent> eventRef = new AtomicReference<>();
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
newTrigger.setListener(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
fail("NodeAddedListener was fired before the configured waitFor period");
}
} else {
@ -270,9 +273,9 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
// ensure the event was fired
assertTrue(fired.get());
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
TriggerEvent nodeAddedEvent = eventRef.get();
assertNotNull(nodeAddedEvent);
assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName());
assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
}
}

View File

@ -29,8 +29,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.TimeSource;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -43,11 +43,14 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
private AutoScaling.TriggerListener<NodeLostTrigger.NodeLostEvent> noFirstRunListener = event -> {
private AutoScaling.TriggerListener noFirstRunListener = event -> {
fail("Did not expect the listener to fire on first run!");
return true;
};
// use the same time source as the trigger
private final TimeSource timeSource = TimeSource.CURRENT_TIME;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(5)
@ -75,11 +78,11 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
cluster.stopJettySolrRunner(1);
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<NodeLostTrigger.NodeLostEvent> eventRef = new AtomicReference<>();
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
trigger.setListener(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
fail("NodeLostListener was fired before the configured waitFor period");
}
} else {
@ -96,9 +99,9 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
}
} while (!fired.get());
NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get();
TriggerEvent nodeLostEvent = eventRef.get();
assertNotNull(nodeLostEvent);
assertEquals("", lostNodeName, nodeLostEvent.getNodeName());
assertEquals("", lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
}
@ -115,7 +118,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setListener(event -> {
if (fired.compareAndSet(false, true)) {
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
fail("NodeLostListener was fired before the configured waitFor period");
}
} else {
@ -184,7 +187,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
}
@Override
public void process(AutoScaling.TriggerEvent event) {
public void process(TriggerEvent event) {
}
@ -284,11 +287,11 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<NodeLostTrigger.NodeLostEvent> eventRef = new AtomicReference<>();
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
newTrigger.setListener(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
fail("NodeLostListener was fired before the configured waitFor period");
}
} else {
@ -306,9 +309,9 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
}
} while (!fired.get());
NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get();
TriggerEvent nodeLostEvent = eventRef.get();
assertNotNull(nodeLostEvent);
assertEquals("", lostNodeName, nodeLostEvent.getNodeName());
assertEquals("", lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
}
}

View File

@ -39,6 +39,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.apache.solr.util.TimeSource;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.BeforeClass;
@ -60,11 +61,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
private static CountDownLatch actionInitCalled;
private static CountDownLatch triggerFiredLatch;
private static int waitForSeconds = 1;
private static CountDownLatch actionStarted;
private static CountDownLatch actionInterrupted;
private static CountDownLatch actionCompleted;
private static AtomicBoolean triggerFired;
private static AtomicReference<AutoScaling.TriggerEvent> eventRef;
private static AtomicReference<TriggerEvent> eventRef;
private String path;
// use the same time source as triggers use
private static final TimeSource timeSource = TimeSource.CURRENT_TIME;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(2)
@ -72,6 +79,22 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
.configure();
}
private static CountDownLatch getTriggerFiredLatch() {
return triggerFiredLatch;
}
private static CountDownLatch getActionStarted() {
return actionStarted;
}
private static CountDownLatch getActionInterrupted() {
return actionInterrupted;
}
private static CountDownLatch getActionCompleted() {
return actionCompleted;
}
@Before
public void setupTest() throws Exception {
waitForSeconds = 1 + random().nextInt(3);
@ -79,6 +102,9 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
actionInitCalled = new CountDownLatch(1);
triggerFiredLatch = new CountDownLatch(1);
triggerFired = new AtomicBoolean(false);
actionStarted = new CountDownLatch(1);
actionInterrupted = new CountDownLatch(1);
actionCompleted = new CountDownLatch(1);
eventRef = new AtomicReference<>();
// clear any persisted auto scaling configuration
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
@ -96,7 +122,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Test
public void testTriggerThrottling() throws Exception {
// for this test we want to create two triggers so we must assert that the actions were created twice
TriggerIntegrationTest.actionInitCalled = new CountDownLatch(2);
actionInitCalled = new CountDownLatch(2);
// similarly we want both triggers to fire
triggerFiredLatch = new CountDownLatch(2);
@ -109,7 +135,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
"'event' : 'nodeAdded'," +
"'waitFor' : '0s'," +
"'enabled' : true," +
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
"}}";
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
@ -122,7 +148,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
"'event' : 'nodeAdded'," +
"'waitFor' : '0s'," +
"'enabled' : true," +
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
"}}";
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
response = solrClient.request(req);
@ -135,7 +161,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
JettySolrRunner newNode = cluster.startJettySolrRunner();
if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) {
if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
fail("Both triggers should have fired by now");
}
@ -150,7 +176,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
"'event' : 'nodeLost'," +
"'waitFor' : '0s'," +
"'enabled' : true," +
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
"}}";
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
response = solrClient.request(req);
@ -162,7 +188,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
"'event' : 'nodeLost'," +
"'waitFor' : '0s'," +
"'enabled' : true," +
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
"}}";
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
response = solrClient.request(req);
@ -183,14 +209,14 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) {
if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
fail("Both triggers should have fired by now");
}
}
static AtomicLong lastActionExecutedAt = new AtomicLong(0);
static ReentrantLock lock = new ReentrantLock();
public static class ThrottingTesterAction extends TestTriggerAction {
public static class ThrottlingTesterAction extends TestTriggerAction {
// nanos are very precise so we need a delta for comparison with ms
private static final long DELTA_MS = 2;
@ -198,7 +224,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
@Override
public void process(AutoScaling.TriggerEvent event) {
public void process(TriggerEvent event) {
boolean locked = lock.tryLock();
if (!locked) {
log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
@ -206,18 +232,18 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
try {
if (lastActionExecutedAt.get() != 0) {
log.info("last action at " + lastActionExecutedAt.get() + " nano time = " + System.nanoTime());
if (System.nanoTime() - lastActionExecutedAt.get() < TimeUnit.NANOSECONDS.convert(ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS, TimeUnit.MILLISECONDS)) {
log.info("action executed again before minimum wait time from {}", event.getSource().getName());
log.info("last action at " + lastActionExecutedAt.get() + " time = " + timeSource.getTime());
if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - lastActionExecutedAt.get(), TimeUnit.NANOSECONDS) < ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS) {
log.info("action executed again before minimum wait time from {}", event.getSource());
fail("TriggerListener was fired before the throttling period");
}
}
if (onlyOnce.compareAndSet(false, true)) {
log.info("action executed from {}", event.getSource().getName());
lastActionExecutedAt.set(System.nanoTime());
triggerFiredLatch.countDown();
log.info("action executed from {}", event.getSource());
lastActionExecutedAt.set(timeSource.getTime());
getTriggerFiredLatch().countDown();
} else {
log.info("action executed more than once from {}", event.getSource().getName());
log.info("action executed more than once from {}", event.getSource());
fail("Trigger should not have fired more than once!");
}
} finally {
@ -293,7 +319,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
assertNotNull(nodeLostEvent);
assertEquals("The node added trigger was fired but for a different node",
nodeName, nodeLostEvent.getNodeName());
nodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
}
@Test
@ -351,7 +377,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
newNode.getNodeName(), nodeAddedEvent.getNodeName());
newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
}
@Test
@ -380,7 +406,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
newNode.getNodeName(), nodeAddedEvent.getNodeName());
newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
// reset
actionConstructorCalled = new CountDownLatch(1);
@ -443,7 +469,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
assertNotNull(nodeLostEvent);
assertEquals("The node lost trigger was fired but for a different node",
lostNodeName, nodeLostEvent.getNodeName());
lostNodeName, nodeLostEvent.getProperty(TriggerEvent.NODE_NAME));
// reset
actionConstructorCalled = new CountDownLatch(1);
@ -507,7 +533,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
// stop the overseer, somebody else will take over as the overseer
cluster.stopJettySolrRunner(index);
Thread.sleep(10000);
JettySolrRunner newNode = cluster.startJettySolrRunner();
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
@ -515,7 +541,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
newNode.getNodeName(), nodeAddedEvent.getNodeName());
newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
}
public static class TestTriggerAction implements TriggerAction {
@ -535,15 +561,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
@Override
public void process(AutoScaling.TriggerEvent event) {
if (triggerFired.compareAndSet(false, true)) {
eventRef.set(event);
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
fail("NodeAddedListener was fired before the configured waitFor period");
public void process(TriggerEvent event) {
try {
if (triggerFired.compareAndSet(false, true)) {
eventRef.set(event);
if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - event.getEventTime(), TimeUnit.NANOSECONDS) <= TimeUnit.MILLISECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
fail("NodeAddedListener was fired before the configured waitFor period");
}
getTriggerFiredLatch().countDown();
} else {
fail("NodeAddedTrigger was fired more than once!");
}
triggerFiredLatch.countDown();
} else {
fail("NodeAddedTrigger was fired more than once!");
} catch (Throwable t) {
log.debug("--throwable", t);
throw t;
}
}
@ -558,4 +589,152 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
actionInitCalled.countDown();
}
}
public static class TestEventQueueAction implements TriggerAction {
public TestEventQueueAction() {
log.info("TestEventQueueAction instantiated");
}
@Override
public String getName() {
return this.getClass().getSimpleName();
}
@Override
public String getClassName() {
return this.getClass().getName();
}
@Override
public void process(TriggerEvent event) {
eventRef.set(event);
getActionStarted().countDown();
try {
Thread.sleep(5000);
triggerFired.compareAndSet(false, true);
getActionCompleted().countDown();
} catch (InterruptedException e) {
getActionInterrupted().countDown();
return;
}
}
@Override
public void close() throws IOException {
}
@Override
public void init(Map<String, String> args) {
log.debug("TestTriggerAction init");
actionInitCalled.countDown();
}
}
@Test
public void testEventQueue() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger1'," +
"'event' : 'nodeAdded'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
"'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
"}}";
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
String overseerLeader = (String) overSeerStatus.get("leader");
int overseerLeaderIndex = 0;
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
if (jetty.getNodeName().equals(overseerLeader)) {
overseerLeaderIndex = i;
break;
}
}
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
fail("The TriggerAction should have been created by now");
}
// add node to generate the event
JettySolrRunner newNode = cluster.startJettySolrRunner();
boolean await = actionStarted.await(60, TimeUnit.SECONDS);
assertTrue("action did not start", await);
// event should be there
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
assertNotNull(nodeAddedEvent);
// but action did not complete yet so the event is still enqueued
assertFalse(triggerFired.get());
actionStarted = new CountDownLatch(1);
// kill overseer leader
cluster.stopJettySolrRunner(overseerLeaderIndex);
Thread.sleep(5000);
await = actionInterrupted.await(3, TimeUnit.SECONDS);
assertTrue("action wasn't interrupted", await);
// new overseer leader should be elected and run triggers
newNode = cluster.startJettySolrRunner();
// it should fire again but not complete yet
await = actionStarted.await(60, TimeUnit.SECONDS);
TriggerEvent replayedEvent = eventRef.get();
assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
assertTrue(replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
await = actionCompleted.await(10, TimeUnit.SECONDS);
assertTrue(triggerFired.get());
}
@Test
public void testEventFromRestoredState() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger'," +
"'event' : 'nodeAdded'," +
"'waitFor' : '10s'," +
"'enabled' : true," +
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
"}}";
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
if (!actionInitCalled.await(10, TimeUnit.SECONDS)) {
fail("The TriggerAction should have been created by now");
}
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
String overseerLeader = (String) overSeerStatus.get("leader");
int overseerLeaderIndex = 0;
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
if (jetty.getNodeName().equals(overseerLeader)) {
overseerLeaderIndex = i;
break;
}
}
JettySolrRunner newNode = cluster.startJettySolrRunner();
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
// reset
triggerFired.set(false);
triggerFiredLatch = new CountDownLatch(1);
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
// add a second node - state of the trigger will change but it won't fire for waitFor sec.
JettySolrRunner newNode2 = cluster.startJettySolrRunner();
Thread.sleep(10000);
// kill overseer leader
cluster.stopJettySolrRunner(overseerLeaderIndex);
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
}
}

View File

@ -92,6 +92,8 @@ public class ZkStateReader implements Closeable {
public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
public static final String SOLR_SECURITY_CONF_PATH = "/security.json";
public static final String SOLR_AUTOSCALING_CONF_PATH = "/autoscaling.json";
public static final String SOLR_AUTOSCALING_EVENTS_PATH = "/autoscaling/events";
public static final String SOLR_AUTOSCALING_TRIGGER_STATE_PATH = "/autoscaling/triggerState";
public static final String REPLICATION_FACTOR = "replicationFactor";
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";

View File

@ -31,6 +31,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -53,33 +55,59 @@ public class Utils {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static Map getDeepCopy(Map map, int maxDepth) {
return getDeepCopy(map, maxDepth, true);
return getDeepCopy(map, maxDepth, true, false);
}
public static Map getDeepCopy(Map map, int maxDepth, boolean mutable) {
return getDeepCopy(map, maxDepth, mutable, false);
}
public static Map getDeepCopy(Map map, int maxDepth, boolean mutable, boolean sorted) {
if(map == null) return null;
if (maxDepth < 1) return map;
Map copy = new LinkedHashMap();
Map copy;
if (sorted) {
copy = new TreeMap();
} else {
copy = new LinkedHashMap();
}
for (Object o : map.entrySet()) {
Map.Entry e = (Map.Entry) o;
copy.put(e.getKey(), makeDeepCopy(e.getValue(),maxDepth, mutable));
copy.put(e.getKey(), makeDeepCopy(e.getValue(),maxDepth, mutable, sorted));
}
return mutable ? copy : Collections.unmodifiableMap(copy);
}
private static Object makeDeepCopy(Object v, int maxDepth, boolean mutable) {
if (v instanceof MapWriter && maxDepth > 1) v = ((MapWriter) v).toMap(new LinkedHashMap<>());
else if (v instanceof IteratorWriter && maxDepth > 1) v = ((IteratorWriter) v).toList(new ArrayList<>());
private static Object makeDeepCopy(Object v, int maxDepth, boolean mutable, boolean sorted) {
if (v instanceof MapWriter && maxDepth > 1) {
v = ((MapWriter) v).toMap(new LinkedHashMap<>());
} else if (v instanceof IteratorWriter && maxDepth > 1) {
v = ((IteratorWriter) v).toList(new ArrayList<>());
if (sorted) {
Collections.sort((List)v);
}
}
if (v instanceof Map) v = getDeepCopy((Map) v, maxDepth - 1, mutable);
else if (v instanceof Collection) v = getDeepCopy((Collection) v, maxDepth - 1, mutable);
if (v instanceof Map) {
v = getDeepCopy((Map) v, maxDepth - 1, mutable, sorted);
} else if (v instanceof Collection) {
v = getDeepCopy((Collection) v, maxDepth - 1, mutable, sorted);
}
return v;
}
public static Collection getDeepCopy(Collection c, int maxDepth, boolean mutable) {
return getDeepCopy(c, maxDepth, mutable, false);
}
public static Collection getDeepCopy(Collection c, int maxDepth, boolean mutable, boolean sorted) {
if (c == null || maxDepth < 1) return c;
Collection result = c instanceof Set ? new HashSet() : new ArrayList();
for (Object o : c) result.add(makeDeepCopy(o, maxDepth, mutable));
Collection result = c instanceof Set ?
( sorted? new TreeSet() : new HashSet()) : new ArrayList();
for (Object o : c) result.add(makeDeepCopy(o, maxDepth, mutable, sorted));
if (sorted && (result instanceof List)) {
Collections.sort((List)result);
}
return mutable ? result : result instanceof Set ? unmodifiableSet((Set) result) : unmodifiableList((List) result);
}