diff --git a/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java b/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java index fbfa9c5670e..63ce808b0f0 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java +++ b/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java @@ -20,6 +20,7 @@ import java.lang.invoke.MethodHandles; import java.util.concurrent.TimeUnit; +import org.apache.solr.util.TimeSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,41 +32,29 @@ public class ActionThrottle { private volatile Long minMsBetweenActions; private final String name; + private final TimeSource timeSource; - private final NanoTimeSource nanoTimeSource; - - public interface NanoTimeSource { - long getTime(); - } - - private static class DefaultNanoTimeSource implements NanoTimeSource { - @Override - public long getTime() { - return System.nanoTime(); - } - } - public ActionThrottle(String name, long minMsBetweenActions) { this.name = name; this.minMsBetweenActions = minMsBetweenActions; - this.nanoTimeSource = new DefaultNanoTimeSource(); + this.timeSource = TimeSource.NANO_TIME; } - public ActionThrottle(String name, long minMsBetweenActions, NanoTimeSource nanoTimeSource) { + public ActionThrottle(String name, long minMsBetweenActions, TimeSource timeSource) { this.name = name; this.minMsBetweenActions = minMsBetweenActions; - this.nanoTimeSource = nanoTimeSource; + this.timeSource = timeSource; } public void markAttemptingAction() { - lastActionStartedAt = nanoTimeSource.getTime(); + lastActionStartedAt = timeSource.getTime(); } public void minimumWaitBetweenActions() { if (lastActionStartedAt == null) { return; } - long diff = nanoTimeSource.getTime() - lastActionStartedAt; + long diff = timeSource.getTime() - lastActionStartedAt; int diffMs = (int) TimeUnit.MILLISECONDS.convert(diff, TimeUnit.NANOSECONDS); long minNsBetweenActions = TimeUnit.NANOSECONDS.convert(minMsBetweenActions, TimeUnit.MILLISECONDS); log.info("The last {} attempt started {}ms ago.", name, diffMs); diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index add10e26a50..dc7605ab310 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -664,6 +664,8 @@ public class ZkController { cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient); cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient); cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient); + cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH, zkClient); + cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, zkClient); byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8); cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient); cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient); diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java index 08ceb27b5be..223629154e7 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java @@ -48,24 +48,15 @@ public class AutoScaling { AFTER_ACTION } - public static interface TriggerEvent { - public T getSource(); - - public long getEventNanoTime(); - - public void setContext(Map context); - - public Map getContext(); - } - - public static interface TriggerListener> { + public interface TriggerListener { /** * This method is executed when a trigger is ready to fire. * * @param event a subclass of {@link TriggerEvent} - * @return true if the listener was ready to perform actions on the event, false otherwise. + * @return true if the listener was ready to perform actions on the event, false + * otherwise. If false was returned then callers should assume the event was discarded. */ - public boolean triggerFired(E event); + boolean triggerFired(TriggerEvent event); } public static class HttpCallbackListener implements TriggerListener { @@ -78,7 +69,7 @@ public class AutoScaling { /** * Interface for a Solr trigger. Each trigger implements Runnable and Closeable interface. A trigger * is scheduled using a {@link java.util.concurrent.ScheduledExecutorService} so it is executed as - * per a configured schedule to check whether the trigger is ready to fire. The {@link #setListener(TriggerListener)} + * per a configured schedule to check whether the trigger is ready to fire. The {@link Trigger#setListener(TriggerListener)} * method should be used to set a callback listener which is fired by implementation of this class whenever * ready. *

@@ -92,29 +83,47 @@ public class AutoScaling { * with the proper trigger event object. If that method returns false then it should be interpreted to mean * that Solr is not ready to process this trigger event and therefore we should retain the state and fire * at the next invocation of the run() method. - * - * @param the {@link TriggerEvent} which is handled by this Trigger */ - public static interface Trigger> extends Closeable, Runnable { - public String getName(); + public interface Trigger extends Closeable, Runnable { + /** + * Trigger name. + */ + String getName(); - public EventType getEventType(); + /** + * Event type generated by this trigger. + */ + EventType getEventType(); - public boolean isEnabled(); + /** Returns true if this trigger is enabled. */ + boolean isEnabled(); - public Map getProperties(); + /** Trigger properties. */ + Map getProperties(); - public int getWaitForSecond(); + /** Number of seconds to wait between fired events ("waitFor" property). */ + int getWaitForSecond(); - public List getActions(); + /** Actions to execute when event is fired. */ + List getActions(); - public void setListener(TriggerListener listener); + /** Set event listener to call when event is fired. */ + void setListener(TriggerListener listener); - public TriggerListener getListener(); + /** Get event listener. */ + TriggerListener getListener(); - public boolean isClosed(); + /** Return true when this trigger is closed and cannot be used. */ + boolean isClosed(); - public void restoreState(Trigger old); + /** Set internal state of this trigger from another instance. */ + void restoreState(Trigger old); + + /** Save internal state of this trigger in ZooKeeper. */ + void saveState(); + + /** Restore internal state of this trigger from ZooKeeper. */ + void restoreState(); /** * Called before a trigger is scheduled. Any heavy object creation or initialisation should diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java index 7617f1334b3..cd183078fc2 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java @@ -39,7 +39,6 @@ import org.apache.solr.client.solrj.impl.SolrClientDataProvider; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.util.SuppressForbidden; import org.apache.solr.common.util.Utils; import org.apache.solr.core.CoreContainer; import org.apache.solr.handler.RequestHandlerBase; @@ -49,6 +48,7 @@ import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.security.AuthorizationContext; import org.apache.solr.security.PermissionNameProvider; import org.apache.solr.util.CommandOperation; +import org.apache.solr.util.TimeSource; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -66,6 +66,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission protected final CoreContainer container; private final List> DEFAULT_ACTIONS = new ArrayList<>(3); private static ImmutableSet singletonCommands = ImmutableSet.of("set-cluster-preferences", "set-cluster-policy"); + private static final TimeSource timeSource = TimeSource.CURRENT_TIME; public AutoScalingHandler(CoreContainer container) { @@ -250,7 +251,6 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission rsp.getValues().add("result", "success"); } - @SuppressForbidden(reason = "currentTimeMillis is used to find the resume time for the trigger") private void handleSuspendTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException { String triggerName = op.getStr("name"); @@ -263,7 +263,8 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission if (timeout != null) { try { int timeoutSeconds = parseHumanTime(timeout); - resumeTime = new Date(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeoutSeconds, TimeUnit.SECONDS)); + resumeTime = new Date(TimeUnit.MILLISECONDS.convert(timeSource.getTime(), TimeUnit.NANOSECONDS) + + TimeUnit.MILLISECONDS.convert(timeoutSeconds, TimeUnit.SECONDS)); } catch (IllegalArgumentException e) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid 'timeout' value for suspend trigger: " + triggerName); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java index 3e7aff60a1c..1b8e680a4c3 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java @@ -45,7 +45,7 @@ public class ComputePlanAction implements TriggerAction { } @Override - public void process(AutoScaling.TriggerEvent event) { + public void process(TriggerEvent event) { } } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java index 59509481baa..90a7cf79e22 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java @@ -45,7 +45,7 @@ public class ExecutePlanAction implements TriggerAction { } @Override - public void process(AutoScaling.TriggerEvent event) { + public void process(TriggerEvent event) { } } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java index fc86c96da0d..f89e8d9c588 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java @@ -45,7 +45,7 @@ public class LogPlanAction implements TriggerAction { } @Override - public void process(AutoScaling.TriggerEvent event) { + public void process(TriggerEvent event) { } } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java index 93f8e84c4e1..642ae563c21 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java @@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -35,23 +36,25 @@ import org.apache.lucene.util.IOUtils; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.core.CoreContainer; +import org.apache.solr.util.TimeSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Trigger for the {@link org.apache.solr.cloud.autoscaling.AutoScaling.EventType#NODEADDED} event */ -public class NodeAddedTrigger implements AutoScaling.Trigger { +public class NodeAddedTrigger extends TriggerBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final String name; private final Map properties; private final CoreContainer container; private final List actions; - private final AtomicReference> listenerRef; + private final AtomicReference listenerRef; private final boolean enabled; private final int waitForSecond; private final AutoScaling.EventType eventType; + private final TimeSource timeSource; private boolean isClosed = false; @@ -61,9 +64,11 @@ public class NodeAddedTrigger implements AutoScaling.Trigger properties, CoreContainer container) { + super(container.getZkController().getZkClient()); this.name = name; this.properties = properties; this.container = container; + this.timeSource = TimeSource.CURRENT_TIME; this.listenerRef = new AtomicReference<>(); List> o = (List>) properties.get("actions"); if (o != null && !o.isEmpty()) { @@ -75,6 +80,8 @@ public class NodeAddedTrigger implements AutoScaling.Trigger(container.getZkController().getZkStateReader().getClusterState().getLiveNodes()); + log.debug("Initial livenodes: {}", lastLiveNodes); this.enabled = (boolean) properties.getOrDefault("enabled", true); this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue(); this.eventType = AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT)); @@ -93,12 +100,12 @@ public class NodeAddedTrigger implements AutoScaling.Trigger listener) { + public void setListener(AutoScaling.TriggerListener listener) { listenerRef.set(listener); } @Override - public AutoScaling.TriggerListener getListener() { + public AutoScaling.TriggerListener getListener() { return listenerRef.get(); } @@ -156,7 +163,7 @@ public class NodeAddedTrigger implements AutoScaling.Trigger old) { + public void restoreState(AutoScaling.Trigger old) { assert old.isClosed(); if (old instanceof NodeAddedTrigger) { NodeAddedTrigger that = (NodeAddedTrigger) old; @@ -169,6 +176,28 @@ public class NodeAddedTrigger implements AutoScaling.Trigger getState() { + Map state = new HashMap<>(); + state.put("lastLiveNodes", lastLiveNodes); + state.put("nodeNameVsTimeAdded", nodeNameVsTimeAdded); + return state; + } + + @Override + protected void setState(Map state) { + this.lastLiveNodes.clear(); + this.nodeNameVsTimeAdded.clear(); + Collection lastLiveNodes = (Collection)state.get("lastLiveNodes"); + if (lastLiveNodes != null) { + this.lastLiveNodes.addAll(lastLiveNodes); + } + Map nodeNameVsTimeAdded = (Map)state.get("nodeNameVsTimeAdded"); + if (nodeNameVsTimeAdded != null) { + this.nodeNameVsTimeAdded.putAll(nodeNameVsTimeAdded); + } + } + @Override public void run() { try { @@ -183,10 +212,6 @@ public class NodeAddedTrigger implements AutoScaling.Trigger newLiveNodes = reader.getClusterState().getLiveNodes(); log.debug("Found livenodes: {}", newLiveNodes); - if (lastLiveNodes == null) { - lastLiveNodes = newLiveNodes; - return; - } // have any nodes that we were tracking been removed from the cluster? // if so, remove them from the tracking map @@ -197,22 +222,22 @@ public class NodeAddedTrigger implements AutoScaling.Trigger copyOfNew = new HashSet<>(newLiveNodes); copyOfNew.removeAll(lastLiveNodes); copyOfNew.forEach(n -> { - long nanoTime = System.nanoTime(); - nodeNameVsTimeAdded.put(n, nanoTime); - log.info("Tracking new node: {} at {} nanotime", n, nanoTime); + long eventTime = timeSource.getTime(); + nodeNameVsTimeAdded.put(n, eventTime); + log.debug("Tracking new node: {} at time {}", n, eventTime); }); // has enough time expired to trigger events for a node? for (Map.Entry entry : nodeNameVsTimeAdded.entrySet()) { String nodeName = entry.getKey(); Long timeAdded = entry.getValue(); - long now = System.nanoTime(); + long now = timeSource.getTime(); if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) { // fire! - AutoScaling.TriggerListener listener = listenerRef.get(); + AutoScaling.TriggerListener listener = listenerRef.get(); if (listener != null) { - log.info("NodeAddedTrigger {} firing registered listener for node: {} added at {} nanotime, now: {} nanotime", name, nodeName, timeAdded, now); - if (listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName))) { + log.debug("NodeAddedTrigger {} firing registered listener for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now); + if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) { // remove from tracking set only if the fire was accepted trackingKeySet.remove(nodeName); } @@ -222,7 +247,7 @@ public class NodeAddedTrigger implements AutoScaling.Trigger { - private final NodeAddedTrigger source; - private final long nodeAddedNanoTime; - private final String nodeName; + public static class NodeAddedEvent extends TriggerEvent { - private Map context; - - public NodeAddedEvent(NodeAddedTrigger source, long nodeAddedNanoTime, String nodeAdded) { - this.source = source; - this.nodeAddedNanoTime = nodeAddedNanoTime; - this.nodeName = nodeAdded; - } - - @Override - public NodeAddedTrigger getSource() { - return source; - } - - @Override - public long getEventNanoTime() { - return nodeAddedNanoTime; - } - - public String getNodeName() { - return nodeName; - } - - public AutoScaling.EventType getType() { - return source.getEventType(); - } - - @Override - public void setContext(Map context) { - this.context = context; - } - - @Override - public Map getContext() { - return context; + public NodeAddedEvent(AutoScaling.EventType eventType, String source, long nodeAddedTime, String nodeAdded) { + super(eventType, source, nodeAddedTime, Collections.singletonMap(NODE_NAME, nodeAdded)); } } } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java index b755f25c570..76c59db2a7f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java @@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -35,23 +36,25 @@ import org.apache.lucene.util.IOUtils; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.core.CoreContainer; +import org.apache.solr.util.TimeSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Trigger for the {@link AutoScaling.EventType#NODELOST} event */ -public class NodeLostTrigger implements AutoScaling.Trigger { +public class NodeLostTrigger extends TriggerBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final String name; private final Map properties; private final CoreContainer container; private final List actions; - private final AtomicReference> listenerRef; + private final AtomicReference listenerRef; private final boolean enabled; private final int waitForSecond; private final AutoScaling.EventType eventType; + private final TimeSource timeSource; private boolean isClosed = false; @@ -61,9 +64,11 @@ public class NodeLostTrigger implements AutoScaling.Trigger properties, CoreContainer container) { + super(container.getZkController().getZkClient()); this.name = name; this.properties = properties; this.container = container; + this.timeSource = TimeSource.CURRENT_TIME; this.listenerRef = new AtomicReference<>(); List> o = (List>) properties.get("actions"); if (o != null && !o.isEmpty()) { @@ -75,7 +80,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger(container.getZkController().getZkStateReader().getClusterState().getLiveNodes()); log.debug("Initial livenodes: {}", lastLiveNodes); this.enabled = (boolean) properties.getOrDefault("enabled", true); this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue(); @@ -94,12 +99,12 @@ public class NodeLostTrigger implements AutoScaling.Trigger listener) { + public void setListener(AutoScaling.TriggerListener listener) { listenerRef.set(listener); } @Override - public AutoScaling.TriggerListener getListener() { + public AutoScaling.TriggerListener getListener() { return listenerRef.get(); } @@ -157,7 +162,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger old) { + public void restoreState(AutoScaling.Trigger old) { assert old.isClosed(); if (old instanceof NodeLostTrigger) { NodeLostTrigger that = (NodeLostTrigger) old; @@ -170,6 +175,28 @@ public class NodeLostTrigger implements AutoScaling.Trigger getState() { + Map state = new HashMap<>(); + state.put("lastLiveNodes", lastLiveNodes); + state.put("nodeNameVsTimeRemoved", nodeNameVsTimeRemoved); + return state; + } + + @Override + protected void setState(Map state) { + this.lastLiveNodes.clear(); + this.nodeNameVsTimeRemoved.clear(); + Collection lastLiveNodes = (Collection)state.get("lastLiveNodes"); + if (lastLiveNodes != null) { + this.lastLiveNodes.addAll(lastLiveNodes); + } + Map nodeNameVsTimeRemoved = (Map)state.get("nodeNameVsTimeRemoved"); + if (nodeNameVsTimeRemoved != null) { + this.nodeNameVsTimeRemoved.putAll(nodeNameVsTimeRemoved); + } + } + @Override public void run() { try { @@ -194,20 +221,20 @@ public class NodeLostTrigger implements AutoScaling.Trigger copyOfLastLiveNodes = new HashSet<>(lastLiveNodes); copyOfLastLiveNodes.removeAll(newLiveNodes); copyOfLastLiveNodes.forEach(n -> { - log.info("Tracking lost node: {}", n); - nodeNameVsTimeRemoved.put(n, System.nanoTime()); + log.debug("Tracking lost node: {}", n); + nodeNameVsTimeRemoved.put(n, timeSource.getTime()); }); // has enough time expired to trigger events for a node? for (Map.Entry entry : nodeNameVsTimeRemoved.entrySet()) { String nodeName = entry.getKey(); Long timeRemoved = entry.getValue(); - if (TimeUnit.SECONDS.convert(System.nanoTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) { + if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) { // fire! - AutoScaling.TriggerListener listener = listenerRef.get(); + AutoScaling.TriggerListener listener = listenerRef.get(); if (listener != null) { - log.info("NodeLostTrigger firing registered listener"); - if (listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName))) { + log.debug("NodeLostTrigger firing registered listener"); + if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) { trackingKeySet.remove(nodeName); } } else { @@ -216,7 +243,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger(newLiveNodes); } catch (RuntimeException e) { log.error("Unexpected exception in NodeLostTrigger", e); } @@ -229,45 +256,10 @@ public class NodeLostTrigger implements AutoScaling.Trigger { - private final NodeLostTrigger source; - private final long nodeLostNanoTime; - private final String nodeName; + public static class NodeLostEvent extends TriggerEvent { - private Map context; - - public NodeLostEvent(NodeLostTrigger source, long nodeLostNanoTime, String nodeRemoved) { - this.source = source; - this.nodeLostNanoTime = nodeLostNanoTime; - this.nodeName = nodeRemoved; - } - - @Override - public NodeLostTrigger getSource() { - return source; - } - - @Override - public long getEventNanoTime() { - return nodeLostNanoTime; - } - - public String getNodeName() { - return nodeName; - } - - public AutoScaling.EventType getType() { - return source.getEventType(); - } - - @Override - public void setContext(Map context) { - this.context = context; - } - - @Override - public Map getContext() { - return context; + public NodeLostEvent(AutoScaling.EventType eventType, String source, long nodeLostTime, String nodeRemoved) { + super(eventType, source, nodeLostTime, Collections.singletonMap(NODE_NAME, nodeRemoved)); } } } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java index cd619c1c8c1..4a89ce7920d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -77,7 +78,7 @@ public class OverseerTriggerThread implements Runnable, Closeable { this.zkController = zkController; zkStateReader = zkController.getZkStateReader(); zkClient = zkController.getZkClient(); - scheduledTriggers = new ScheduledTriggers(); + scheduledTriggers = new ScheduledTriggers(zkClient); triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer()); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java index 37cb9c03034..6180b358b2f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java @@ -21,11 +21,11 @@ import java.io.Closeable; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; @@ -35,9 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.store.AlreadyClosedException; import org.apache.solr.cloud.ActionThrottle; +import org.apache.solr.cloud.Overseer; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.IOUtils; import org.apache.solr.util.DefaultSolrThreadFactory; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +54,7 @@ public class ScheduledTriggers implements Closeable { static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1; static final int DEFAULT_MIN_MS_BETWEEN_ACTIONS = 5000; - private final Map scheduledTriggers = new HashMap<>(); + private final Map scheduledTriggers = new ConcurrentHashMap<>(); /** * Thread pool for scheduling the triggers @@ -70,7 +74,11 @@ public class ScheduledTriggers implements Closeable { private final ActionThrottle actionThrottle; - public ScheduledTriggers() { + private final SolrZkClient zkClient; + + private final Overseer.Stats queueStats; + + public ScheduledTriggers(SolrZkClient zkClient) { // todo make the core pool size configurable // it is important to use more than one because a time taking trigger can starve other scheduled triggers // ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand @@ -83,6 +91,8 @@ public class ScheduledTriggers implements Closeable { actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor")); // todo make the wait time configurable actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS); + this.zkClient = zkClient; + queueStats = new Overseer.Stats(); } /** @@ -97,7 +107,7 @@ public class ScheduledTriggers implements Closeable { if (isClosed) { throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore"); } - ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger); + ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger, zkClient, queueStats); ScheduledTrigger old = scheduledTriggers.putIfAbsent(newTrigger.getName(), scheduledTrigger); if (old != null) { if (old.trigger.equals(newTrigger)) { @@ -106,20 +116,34 @@ public class ScheduledTriggers implements Closeable { } IOUtils.closeQuietly(old); newTrigger.restoreState(old.trigger); + scheduledTrigger.setReplay(false); scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger); } newTrigger.setListener(event -> { - AutoScaling.Trigger source = event.getSource(); + ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource()); + if (scheduledSource == null) { + log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + event.getSource() + " doesn't exist."); + return false; + } + boolean replaying = event.getProperty(TriggerEvent.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEvent.REPLAYING) : false; + AutoScaling.Trigger source = scheduledSource.trigger; if (source.isClosed()) { - log.warn("Ignoring autoscaling event because the source trigger: " + source + " has already been closed"); - // we do not want to lose this event just because the trigger were closed, perhaps a replacement will need it + log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + source + " has already been closed"); + // we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it return false; } if (hasPendingActions.compareAndSet(false, true)) { + final boolean enqueued; + if (replaying) { + enqueued = false; + } else { + enqueued = scheduledTrigger.enqueue(event); + } List actions = source.getActions(); if (actions != null) { actionExecutor.submit(() -> { assert hasPendingActions.get(); + log.debug("-- processing actions for " + event); try { // let the action executor thread wait instead of the trigger thread so we use the throttle here actionThrottle.minimumWaitBetweenActions(); @@ -132,10 +156,23 @@ public class ScheduledTriggers implements Closeable { throw e; } } + if (enqueued) { + TriggerEvent ev = scheduledTrigger.dequeue(); + assert ev.getId().equals(event.getId()); + } } finally { hasPendingActions.set(false); } }); + } else { + if (enqueued) { + TriggerEvent ev = scheduledTrigger.dequeue(); + if (!ev.getId().equals(event.getId())) { + throw new RuntimeException("Wrong event dequeued, queue of " + scheduledTrigger.trigger.getName() + + " is broken! Expected event=" + event + " but got " + ev); + } + } + hasPendingActions.set(false); } return true; } else { @@ -148,19 +185,37 @@ public class ScheduledTriggers implements Closeable { } /** - * Removes and stops the trigger with the given name + * Removes and stops the trigger with the given name. Also cleans up any leftover + * state / events in ZK. * * @param triggerName the name of the trigger to be removed - * @throws AlreadyClosedException if this class has already been closed */ public synchronized void remove(String triggerName) { - if (isClosed) { - throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used any more"); - } ScheduledTrigger removed = scheduledTriggers.remove(triggerName); IOUtils.closeQuietly(removed); + removeTriggerZKData(triggerName); } + private void removeTriggerZKData(String triggerName) { + String statePath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName; + String eventsPath = ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName; + try { + if (zkClient.exists(statePath, true)) { + zkClient.delete(statePath, -1, true); + } + } catch (KeeperException | InterruptedException e) { + log.warn("Failed to remove state for removed trigger " + statePath, e); + } + try { + if (zkClient.exists(eventsPath, true)) { + zkClient.delete(eventsPath, -1, true); + } + } catch (KeeperException | InterruptedException e) { + log.warn("Failed to remove events for removed trigger " + eventsPath, e); + } + } + + /** * @return an unmodifiable set of names of all triggers being managed by this class */ @@ -178,36 +233,91 @@ public class ScheduledTriggers implements Closeable { } scheduledTriggers.clear(); } - ExecutorUtil.shutdownAndAwaitTermination(scheduledThreadPoolExecutor); - ExecutorUtil.shutdownAndAwaitTermination(actionExecutor); + // shutdown and interrupt all running tasks because there's no longer any + // guarantee about cluster state + scheduledThreadPoolExecutor.shutdownNow(); + actionExecutor.shutdownNow(); } private class ScheduledTrigger implements Runnable, Closeable { AutoScaling.Trigger trigger; ScheduledFuture scheduledFuture; + TriggerEventQueue queue; + boolean replay; + volatile boolean isClosed; - ScheduledTrigger(AutoScaling.Trigger trigger) { + ScheduledTrigger(AutoScaling.Trigger trigger, SolrZkClient zkClient, Overseer.Stats stats) { this.trigger = trigger; + this.queue = new TriggerEventQueue(zkClient, trigger.getName(), stats); + this.replay = true; + this.isClosed = false; + } + + public void setReplay(boolean replay) { + this.replay = replay; + } + + public boolean enqueue(TriggerEvent event) { + if (isClosed) { + throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed."); + } + return queue.offerEvent(event); + } + + public TriggerEvent dequeue() { + if (isClosed) { + throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed."); + } + TriggerEvent event = queue.pollEvent(); + return event; } @Override public void run() { + if (isClosed) { + throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed."); + } // fire a trigger only if an action is not pending // note this is not fool proof e.g. it does not prevent an action being executed while a trigger // is still executing. There is additional protection against that scenario in the event listener. if (!hasPendingActions.get()) { + // replay accumulated events on first run, if any + if (replay) { + TriggerEvent event; + // peek first without removing - we may crash before calling the listener + while ((event = queue.peekEvent()) != null) { + // override REPLAYING=true + event.getProperties().put(TriggerEvent.REPLAYING, true); + if (! trigger.getListener().triggerFired(event)) { + log.error("Failed to re-play event, discarding: " + event); + } + queue.pollEvent(); // always remove it from queue + } + // now restore saved state to possibly generate new events from old state on the first run + try { + trigger.restoreState(); + } catch (Exception e) { + // log but don't throw - see below + log.error("Error restoring trigger state " + trigger.getName(), e); + } + replay = false; + } try { trigger.run(); } catch (Exception e) { // log but do not propagate exception because an exception thrown from a scheduled operation // will suppress future executions log.error("Unexpected execution from trigger: " + trigger.getName(), e); + } finally { + // checkpoint after each run + trigger.saveState(); } } } @Override public void close() throws IOException { + isClosed = true; if (scheduledFuture != null) { scheduledFuture.cancel(true); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java index 242c9de1232..b00dfd0da86 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java @@ -30,5 +30,5 @@ public interface TriggerAction extends MapInitializedPlugin, Closeable { public String getClassName(); - public void process(AutoScaling.TriggerEvent event); + public void process(TriggerEvent event); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java new file mode 100644 index 00000000000..ef9a3cfb6f3 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.autoscaling; + +import java.lang.invoke.MethodHandles; +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.Utils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} implementations. + * It handles state snapshot / restore in ZK. + */ +public abstract class TriggerBase implements AutoScaling.Trigger { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + protected SolrZkClient zkClient; + protected Map lastState; + + + protected TriggerBase(SolrZkClient zkClient) { + this.zkClient = zkClient; + try { + zkClient.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, false, true); + } catch (KeeperException | InterruptedException e) { + LOG.warn("Exception checking ZK path " + ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, e); + } + } + + /** + * Prepare and return internal state of this trigger in a format suitable for persisting in ZK. + * @return map of internal state properties. Note: values must be supported by {@link Utils#toJSON(Object)}. + */ + protected abstract Map getState(); + + /** + * Restore internal state of this trigger from properties retrieved from ZK. + * @param state never null but may be empty. + */ + protected abstract void setState(Map state); + + @Override + public void saveState() { + Map state = Utils.getDeepCopy(getState(), 10, false, true); + if (lastState != null && lastState.equals(state)) { + // skip saving if identical + return; + } + byte[] data = Utils.toJSON(state); + String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName(); + try { + if (zkClient.exists(path, true)) { + // update + zkClient.setData(path, data, -1, true); + } else { + // create + zkClient.create(path, data, CreateMode.PERSISTENT, true); + } + lastState = state; + } catch (KeeperException | InterruptedException e) { + LOG.warn("Exception updating trigger state '" + path + "'", e); + } + } + + @Override + public void restoreState() { + byte[] data = null; + String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName(); + try { + if (zkClient.exists(path, true)) { + data = zkClient.getData(path, null, new Stat(), true); + } + } catch (KeeperException | InterruptedException e) { + LOG.warn("Exception getting trigger state '" + path + "'", e); + } + if (data != null) { + Map state = (Map)Utils.fromJSON(data); + // make sure lastState is sorted + state = Utils.getDeepCopy(state, 10, false, true);; + setState(state); + lastState = state; + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java new file mode 100644 index 00000000000..fa2775929a4 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.autoscaling; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.solr.common.MapWriter; +import org.apache.solr.util.IdUtils; + +/** + * Trigger event. + */ +public class TriggerEvent implements MapWriter { + public static final String REPLAYING = "replaying"; + public static final String NODE_NAME = "nodeName"; + + protected final String id; + protected final String source; + protected final long eventTime; + protected final AutoScaling.EventType eventType; + protected final Map properties = new HashMap<>(); + + public TriggerEvent(AutoScaling.EventType eventType, String source, long eventTime, + Map properties) { + this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties); + } + + public TriggerEvent(String id, AutoScaling.EventType eventType, String source, long eventTime, + Map properties) { + this.id = id; + this.eventType = eventType; + this.source = source; + this.eventTime = eventTime; + if (properties != null) { + this.properties.putAll(properties); + } + } + + /** + * Unique event id. + */ + public String getId() { + return id; + } + + /** + * Name of the trigger that fired the event. + */ + public String getSource() { + return source; + } + + /** + * Timestamp of the actual event, in nanoseconds. + * NOTE: this is NOT the timestamp when the event was fired - events may be fired + * much later than the actual condition that generated the event, due to the "waitFor" limit. + */ + public long getEventTime() { + return eventTime; + } + + /** + * Get event properties (modifiable). + */ + public Map getProperties() { + return properties; + } + + /** + * Get a named event property or null if missing. + */ + public Object getProperty(String name) { + return properties.get(name); + } + + /** + * Event type. + */ + public AutoScaling.EventType getEventType() { + return eventType; + } + + /** + * Set event properties. + * + * @param properties may be null. A shallow copy of this parameter is used. + */ + public void setProperties(Map properties) { + this.properties.clear(); + if (properties != null) { + this.properties.putAll(properties); + } + } + + @Override + public void writeMap(EntryWriter ew) throws IOException { + ew.put("id", id); + ew.put("source", source); + ew.put("eventTime", eventTime); + ew.put("eventType", eventType.toString()); + ew.put("properties", properties); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TriggerEvent that = (TriggerEvent) o; + + if (eventTime != that.eventTime) return false; + if (!id.equals(that.id)) return false; + if (!source.equals(that.source)) return false; + if (eventType != that.eventType) return false; + return properties.equals(that.properties); + } + + @Override + public int hashCode() { + int result = id.hashCode(); + result = 31 * result + source.hashCode(); + result = 31 * result + (int) (eventTime ^ (eventTime >>> 32)); + result = 31 * result + eventType.hashCode(); + result = 31 * result + properties.hashCode(); + return result; + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "{" + + "id='" + id + '\'' + + ", source='" + source + '\'' + + ", eventTime=" + eventTime + + ", properties=" + properties + + '}'; + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java new file mode 100644 index 00000000000..3a73f544363 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java @@ -0,0 +1,100 @@ +package org.apache.solr.cloud.autoscaling; + +import java.lang.invoke.MethodHandles; +import java.util.Map; + +import org.apache.solr.cloud.DistributedQueue; +import org.apache.solr.cloud.Overseer; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.Utils; +import org.apache.solr.util.TimeSource; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class TriggerEventQueue extends DistributedQueue { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String ENQUEUE_TIME = "_enqueue_time_"; + public static final String DEQUEUE_TIME = "_dequeue_time_"; + + private final String triggerName; + private final TimeSource timeSource; + + public TriggerEventQueue(SolrZkClient zookeeper, String triggerName, Overseer.Stats stats) { + super(zookeeper, ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName, stats); + this.triggerName = triggerName; + this.timeSource = TimeSource.CURRENT_TIME; + } + + public boolean offerEvent(TriggerEvent event) { + event.getProperties().put(ENQUEUE_TIME, timeSource.getTime()); + try { + byte[] data = Utils.toJSON(event); + offer(data); + return true; + } catch (KeeperException | InterruptedException e) { + LOG.warn("Exception adding event " + event + " to queue " + triggerName, e); + return false; + } + } + + public TriggerEvent peekEvent() { + byte[] data; + try { + while ((data = peek()) != null) { + if (data.length == 0) { + LOG.warn("ignoring empty data..."); + continue; + } + try { + Map map = (Map) Utils.fromJSON(data); + return fromMap(map); + } catch (Exception e) { + LOG.warn("Invalid event data, ignoring: " + new String(data)); + continue; + } + } + } catch (KeeperException | InterruptedException e) { + LOG.warn("Exception peeking queue of trigger " + triggerName, e); + } + return null; + } + + public TriggerEvent pollEvent() { + byte[] data; + try { + while ((data = poll()) != null) { + if (data.length == 0) { + LOG.warn("ignoring empty data..."); + continue; + } + try { + Map map = (Map) Utils.fromJSON(data); + return fromMap(map); + } catch (Exception e) { + LOG.warn("Invalid event data, ignoring: " + new String(data)); + continue; + } + } + } catch (KeeperException | InterruptedException e) { + LOG.warn("Exception polling queue of trigger " + triggerName, e); + } + return null; + } + + private TriggerEvent fromMap(Map map) { + String id = (String)map.get("id"); + String source = (String)map.get("source"); + long eventTime = ((Number)map.get("eventTime")).longValue(); + AutoScaling.EventType eventType = AutoScaling.EventType.valueOf((String)map.get("eventType")); + Map properties = (Map)map.get("properties"); + TriggerEvent res = new TriggerEvent(id, eventType, source, eventTime, properties); + res.getProperties().put(DEQUEUE_TIME, timeSource.getTime()); + return res; + } +} diff --git a/solr/core/src/java/org/apache/solr/util/IdUtils.java b/solr/core/src/java/org/apache/solr/util/IdUtils.java new file mode 100644 index 00000000000..1ad9cf8fc24 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/util/IdUtils.java @@ -0,0 +1,39 @@ +package org.apache.solr.util; + +import java.util.concurrent.TimeUnit; + +import org.apache.lucene.util.StringHelper; + +/** + * Helper class for generating unique ID-s. + */ +public class IdUtils { + + /** + * Generate a short random id (see {@link StringHelper#randomId()}). + */ + public static final String randomId() { + return StringHelper.idToString(StringHelper.randomId()); + } + + /** + * Generate a random id with a timestamp, in the format: + * hex(timestamp) + 'T' + randomId. This method + * uses {@link TimeSource#CURRENT_TIME} for timestamp values. + */ + public static final String timeRandomId() { + return timeRandomId(TimeUnit.MILLISECONDS.convert(TimeSource.CURRENT_TIME.getTime(), TimeUnit.NANOSECONDS)); + } + + /** + * Generate a random id with a timestamp, in the format: + * hex(timestamp) + 'T' + randomId. + * @param time value representing timestamp + */ + public static final String timeRandomId(long time) { + StringBuilder sb = new StringBuilder(Long.toHexString(time)); + sb.append('T'); + sb.append(randomId()); + return sb.toString(); + } +} diff --git a/solr/core/src/java/org/apache/solr/util/TimeSource.java b/solr/core/src/java/org/apache/solr/util/TimeSource.java new file mode 100644 index 00000000000..a0c7bc0a910 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/util/TimeSource.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.util; + +import java.util.concurrent.TimeUnit; + +import org.apache.solr.common.util.SuppressForbidden; + +/** + * Source of timestamps. + */ +public abstract class TimeSource { + + /** Implementation that uses {@link System#currentTimeMillis()}. */ + public static final class CurrentTimeSource extends TimeSource { + + @Override + @SuppressForbidden(reason = "Needed to provide timestamps based on currentTimeMillis.") + public long getTime() { + return TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + } + + /** Implementation that uses {@link System#nanoTime()}. */ + public static final class NanoTimeSource extends TimeSource { + + @Override + public long getTime() { + return System.nanoTime(); + } + } + + /** This instance uses {@link CurrentTimeSource} for generating timestamps. */ + public static final TimeSource CURRENT_TIME = new CurrentTimeSource(); + + /** This instance uses {@link NanoTimeSource} for generating timestamps. */ + public static final TimeSource NANO_TIME = new NanoTimeSource(); + + /** + * Return a timestamp, in nanosecond unit. + */ + public abstract long getTime(); +} diff --git a/solr/core/src/test/org/apache/solr/cloud/ActionThrottleTest.java b/solr/core/src/test/org/apache/solr/cloud/ActionThrottleTest.java index ca256d34f12..c8778999a77 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ActionThrottleTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ActionThrottleTest.java @@ -21,12 +21,12 @@ import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.solr.SolrTestCaseJ4; -import org.apache.solr.cloud.ActionThrottle.NanoTimeSource; +import org.apache.solr.util.TimeSource; import org.junit.Test; public class ActionThrottleTest extends SolrTestCaseJ4 { - static class TestNanoTimeSource implements NanoTimeSource { + static class TestNanoTimeSource extends TimeSource { private List returnValues; private int index = 0; @@ -41,35 +41,38 @@ public class ActionThrottleTest extends SolrTestCaseJ4 { } } + + // use the same time source as ActionThrottle + private static final TimeSource timeSource = TimeSource.NANO_TIME; @Test public void testBasics() throws Exception { ActionThrottle at = new ActionThrottle("test", 1000); - long start = System.nanoTime(); + long start = timeSource.getTime(); at.minimumWaitBetweenActions(); // should be no wait - assertTrue(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS) < 1000); + assertTrue(TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS) < 1000); at.markAttemptingAction(); if (random().nextBoolean()) Thread.sleep(100); at.minimumWaitBetweenActions(); - long elaspsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS); + long elaspsedTime = TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS); assertTrue(elaspsedTime + "ms", elaspsedTime >= 995); - start = System.nanoTime(); + start = timeSource.getTime(); at.markAttemptingAction(); at.minimumWaitBetweenActions(); Thread.sleep(random().nextInt(1000)); - elaspsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS); + elaspsedTime = TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS); assertTrue(elaspsedTime + "ms", elaspsedTime >= 995); } @@ -78,13 +81,13 @@ public class ActionThrottleTest extends SolrTestCaseJ4 { public void testAZeroNanoTimeReturnInWait() throws Exception { ActionThrottle at = new ActionThrottle("test", 1000, new TestNanoTimeSource(Arrays.asList(new Long[]{0L, 10L}))); - long start = System.nanoTime(); + long start = timeSource.getTime(); at.markAttemptingAction(); at.minimumWaitBetweenActions(); - long elaspsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS); + long elaspsedTime = TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS); assertTrue(elaspsedTime + "ms", elaspsedTime >= 995); diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java index ec06b23d3df..9730c5b6e1f 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.core.CoreContainer; +import org.apache.solr.util.TimeSource; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -42,11 +43,13 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { private static AtomicBoolean actionInitCalled = new AtomicBoolean(false); private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false); - private AutoScaling.TriggerListener noFirstRunListener = event -> { + private AutoScaling.TriggerListener noFirstRunListener = event -> { fail("Did not expect the listener to fire on first run!"); return true; }; + private static final TimeSource timeSource = TimeSource.CURRENT_TIME; + @BeforeClass public static void setupCluster() throws Exception { configureCluster(1) @@ -73,11 +76,11 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { JettySolrRunner newNode = cluster.startJettySolrRunner(); AtomicBoolean fired = new AtomicBoolean(false); - AtomicReference eventRef = new AtomicReference<>(); + AtomicReference eventRef = new AtomicReference<>(); trigger.setListener(event -> { if (fired.compareAndSet(false, true)) { eventRef.set(event); - if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { + if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { fail("NodeAddedListener was fired before the configured waitFor period"); } } else { @@ -94,9 +97,9 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { } } while (!fired.get()); - NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get(); + TriggerEvent nodeAddedEvent = eventRef.get(); assertNotNull(nodeAddedEvent); - assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName()); + assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME)); } // add a new node but remove it before the waitFor period expires @@ -111,7 +114,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { AtomicBoolean fired = new AtomicBoolean(false); trigger.setListener(event -> { if (fired.compareAndSet(false, true)) { - if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) { + if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) { fail("NodeAddedListener was fired before the configured waitFor period"); } } else { @@ -170,7 +173,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { } @Override - public void process(AutoScaling.TriggerEvent event) { + public void process(TriggerEvent event) { } @@ -246,11 +249,11 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, container)) { AtomicBoolean fired = new AtomicBoolean(false); - AtomicReference eventRef = new AtomicReference<>(); + AtomicReference eventRef = new AtomicReference<>(); newTrigger.setListener(event -> { if (fired.compareAndSet(false, true)) { eventRef.set(event); - if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) { + if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) { fail("NodeAddedListener was fired before the configured waitFor period"); } } else { @@ -270,9 +273,9 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { // ensure the event was fired assertTrue(fired.get()); - NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get(); + TriggerEvent nodeAddedEvent = eventRef.get(); assertNotNull(nodeAddedEvent); - assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName()); + assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME)); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java index 9baae0f9bb4..6e354672d4d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java @@ -29,8 +29,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.cloud.SolrCloudTestCase; -import org.apache.solr.common.util.Utils; import org.apache.solr.core.CoreContainer; +import org.apache.solr.util.TimeSource; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -43,11 +43,14 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { private static AtomicBoolean actionInitCalled = new AtomicBoolean(false); private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false); - private AutoScaling.TriggerListener noFirstRunListener = event -> { + private AutoScaling.TriggerListener noFirstRunListener = event -> { fail("Did not expect the listener to fire on first run!"); return true; }; + // use the same time source as the trigger + private final TimeSource timeSource = TimeSource.CURRENT_TIME; + @BeforeClass public static void setupCluster() throws Exception { configureCluster(5) @@ -75,11 +78,11 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { cluster.stopJettySolrRunner(1); AtomicBoolean fired = new AtomicBoolean(false); - AtomicReference eventRef = new AtomicReference<>(); + AtomicReference eventRef = new AtomicReference<>(); trigger.setListener(event -> { if (fired.compareAndSet(false, true)) { eventRef.set(event); - if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { + if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { fail("NodeLostListener was fired before the configured waitFor period"); } } else { @@ -96,9 +99,9 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { } } while (!fired.get()); - NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get(); + TriggerEvent nodeLostEvent = eventRef.get(); assertNotNull(nodeLostEvent); - assertEquals("", lostNodeName, nodeLostEvent.getNodeName()); + assertEquals("", lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME)); } @@ -115,7 +118,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { AtomicBoolean fired = new AtomicBoolean(false); trigger.setListener(event -> { if (fired.compareAndSet(false, true)) { - if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) { + if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) { fail("NodeLostListener was fired before the configured waitFor period"); } } else { @@ -184,7 +187,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { } @Override - public void process(AutoScaling.TriggerEvent event) { + public void process(TriggerEvent event) { } @@ -284,11 +287,11 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, container)) { AtomicBoolean fired = new AtomicBoolean(false); - AtomicReference eventRef = new AtomicReference<>(); + AtomicReference eventRef = new AtomicReference<>(); newTrigger.setListener(event -> { if (fired.compareAndSet(false, true)) { eventRef.set(event); - if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { + if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { fail("NodeLostListener was fired before the configured waitFor period"); } } else { @@ -306,9 +309,9 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { } } while (!fired.get()); - NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get(); + TriggerEvent nodeLostEvent = eventRef.get(); assertNotNull(nodeLostEvent); - assertEquals("", lostNodeName, nodeLostEvent.getNodeName()); + assertEquals("", lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME)); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java index 7850f334b00..1c189aebd5c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java @@ -39,6 +39,7 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.Utils; import org.apache.solr.util.LogLevel; import org.apache.solr.util.TimeOut; +import org.apache.solr.util.TimeSource; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.BeforeClass; @@ -60,11 +61,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { private static CountDownLatch actionInitCalled; private static CountDownLatch triggerFiredLatch; private static int waitForSeconds = 1; + private static CountDownLatch actionStarted; + private static CountDownLatch actionInterrupted; + private static CountDownLatch actionCompleted; private static AtomicBoolean triggerFired; - private static AtomicReference eventRef; + private static AtomicReference eventRef; private String path; + // use the same time source as triggers use + private static final TimeSource timeSource = TimeSource.CURRENT_TIME; + @BeforeClass public static void setupCluster() throws Exception { configureCluster(2) @@ -72,6 +79,22 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { .configure(); } + private static CountDownLatch getTriggerFiredLatch() { + return triggerFiredLatch; + } + + private static CountDownLatch getActionStarted() { + return actionStarted; + } + + private static CountDownLatch getActionInterrupted() { + return actionInterrupted; + } + + private static CountDownLatch getActionCompleted() { + return actionCompleted; + } + @Before public void setupTest() throws Exception { waitForSeconds = 1 + random().nextInt(3); @@ -79,6 +102,9 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { actionInitCalled = new CountDownLatch(1); triggerFiredLatch = new CountDownLatch(1); triggerFired = new AtomicBoolean(false); + actionStarted = new CountDownLatch(1); + actionInterrupted = new CountDownLatch(1); + actionCompleted = new CountDownLatch(1); eventRef = new AtomicReference<>(); // clear any persisted auto scaling configuration Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true); @@ -96,7 +122,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { @Test public void testTriggerThrottling() throws Exception { // for this test we want to create two triggers so we must assert that the actions were created twice - TriggerIntegrationTest.actionInitCalled = new CountDownLatch(2); + actionInitCalled = new CountDownLatch(2); // similarly we want both triggers to fire triggerFiredLatch = new CountDownLatch(2); @@ -109,7 +135,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { "'event' : 'nodeAdded'," + "'waitFor' : '0s'," + "'enabled' : true," + - "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" + + "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" + "}}"; SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand); NamedList response = solrClient.request(req); @@ -122,7 +148,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { "'event' : 'nodeAdded'," + "'waitFor' : '0s'," + "'enabled' : true," + - "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" + + "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" + "}}"; req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand); response = solrClient.request(req); @@ -135,7 +161,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { JettySolrRunner newNode = cluster.startJettySolrRunner(); - if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) { + if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) { fail("Both triggers should have fired by now"); } @@ -150,7 +176,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { "'event' : 'nodeLost'," + "'waitFor' : '0s'," + "'enabled' : true," + - "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" + + "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" + "}}"; req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand); response = solrClient.request(req); @@ -162,7 +188,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { "'event' : 'nodeLost'," + "'waitFor' : '0s'," + "'enabled' : true," + - "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" + + "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" + "}}"; req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand); response = solrClient.request(req); @@ -183,14 +209,14 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { } } - if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) { + if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) { fail("Both triggers should have fired by now"); } } static AtomicLong lastActionExecutedAt = new AtomicLong(0); static ReentrantLock lock = new ReentrantLock(); - public static class ThrottingTesterAction extends TestTriggerAction { + public static class ThrottlingTesterAction extends TestTriggerAction { // nanos are very precise so we need a delta for comparison with ms private static final long DELTA_MS = 2; @@ -198,7 +224,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { private final AtomicBoolean onlyOnce = new AtomicBoolean(false); @Override - public void process(AutoScaling.TriggerEvent event) { + public void process(TriggerEvent event) { boolean locked = lock.tryLock(); if (!locked) { log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently"); @@ -206,18 +232,18 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { } try { if (lastActionExecutedAt.get() != 0) { - log.info("last action at " + lastActionExecutedAt.get() + " nano time = " + System.nanoTime()); - if (System.nanoTime() - lastActionExecutedAt.get() < TimeUnit.NANOSECONDS.convert(ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS, TimeUnit.MILLISECONDS)) { - log.info("action executed again before minimum wait time from {}", event.getSource().getName()); + log.info("last action at " + lastActionExecutedAt.get() + " time = " + timeSource.getTime()); + if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - lastActionExecutedAt.get(), TimeUnit.NANOSECONDS) < ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS) { + log.info("action executed again before minimum wait time from {}", event.getSource()); fail("TriggerListener was fired before the throttling period"); } } if (onlyOnce.compareAndSet(false, true)) { - log.info("action executed from {}", event.getSource().getName()); - lastActionExecutedAt.set(System.nanoTime()); - triggerFiredLatch.countDown(); + log.info("action executed from {}", event.getSource()); + lastActionExecutedAt.set(timeSource.getTime()); + getTriggerFiredLatch().countDown(); } else { - log.info("action executed more than once from {}", event.getSource().getName()); + log.info("action executed more than once from {}", event.getSource()); fail("Trigger should not have fired more than once!"); } } finally { @@ -293,7 +319,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get(); assertNotNull(nodeLostEvent); assertEquals("The node added trigger was fired but for a different node", - nodeName, nodeLostEvent.getNodeName()); + nodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME)); } @Test @@ -351,7 +377,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get(); assertNotNull(nodeAddedEvent); assertEquals("The node added trigger was fired but for a different node", - newNode.getNodeName(), nodeAddedEvent.getNodeName()); + newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME)); } @Test @@ -380,7 +406,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get(); assertNotNull(nodeAddedEvent); assertEquals("The node added trigger was fired but for a different node", - newNode.getNodeName(), nodeAddedEvent.getNodeName()); + newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME)); // reset actionConstructorCalled = new CountDownLatch(1); @@ -443,7 +469,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get(); assertNotNull(nodeLostEvent); assertEquals("The node lost trigger was fired but for a different node", - lostNodeName, nodeLostEvent.getNodeName()); + lostNodeName, nodeLostEvent.getProperty(TriggerEvent.NODE_NAME)); // reset actionConstructorCalled = new CountDownLatch(1); @@ -507,7 +533,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { // stop the overseer, somebody else will take over as the overseer cluster.stopJettySolrRunner(index); - + Thread.sleep(10000); JettySolrRunner newNode = cluster.startJettySolrRunner(); boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS); assertTrue("The trigger did not fire at all", await); @@ -515,7 +541,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get(); assertNotNull(nodeAddedEvent); assertEquals("The node added trigger was fired but for a different node", - newNode.getNodeName(), nodeAddedEvent.getNodeName()); + newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME)); } public static class TestTriggerAction implements TriggerAction { @@ -535,15 +561,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { } @Override - public void process(AutoScaling.TriggerEvent event) { - if (triggerFired.compareAndSet(false, true)) { - eventRef.set(event); - if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { - fail("NodeAddedListener was fired before the configured waitFor period"); + public void process(TriggerEvent event) { + try { + if (triggerFired.compareAndSet(false, true)) { + eventRef.set(event); + if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - event.getEventTime(), TimeUnit.NANOSECONDS) <= TimeUnit.MILLISECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { + fail("NodeAddedListener was fired before the configured waitFor period"); + } + getTriggerFiredLatch().countDown(); + } else { + fail("NodeAddedTrigger was fired more than once!"); } - triggerFiredLatch.countDown(); - } else { - fail("NodeAddedTrigger was fired more than once!"); + } catch (Throwable t) { + log.debug("--throwable", t); + throw t; } } @@ -558,4 +589,152 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { actionInitCalled.countDown(); } } + + public static class TestEventQueueAction implements TriggerAction { + + public TestEventQueueAction() { + log.info("TestEventQueueAction instantiated"); + } + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getClassName() { + return this.getClass().getName(); + } + + @Override + public void process(TriggerEvent event) { + eventRef.set(event); + getActionStarted().countDown(); + try { + Thread.sleep(5000); + triggerFired.compareAndSet(false, true); + getActionCompleted().countDown(); + } catch (InterruptedException e) { + getActionInterrupted().countDown(); + return; + } + } + + @Override + public void close() throws IOException { + + } + + @Override + public void init(Map args) { + log.debug("TestTriggerAction init"); + actionInitCalled.countDown(); + } + } + + @Test + public void testEventQueue() throws Exception { + CloudSolrClient solrClient = cluster.getSolrClient(); + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : 'node_added_trigger1'," + + "'event' : 'nodeAdded'," + + "'waitFor' : '" + waitForSeconds + "s'," + + "'enabled' : true," + + "'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" + + "}}"; + NamedList overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus()); + String overseerLeader = (String) overSeerStatus.get("leader"); + int overseerLeaderIndex = 0; + for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) { + JettySolrRunner jetty = cluster.getJettySolrRunner(i); + if (jetty.getNodeName().equals(overseerLeader)) { + overseerLeaderIndex = i; + break; + } + } + SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand); + NamedList response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + if (!actionInitCalled.await(3, TimeUnit.SECONDS)) { + fail("The TriggerAction should have been created by now"); + } + + // add node to generate the event + JettySolrRunner newNode = cluster.startJettySolrRunner(); + boolean await = actionStarted.await(60, TimeUnit.SECONDS); + assertTrue("action did not start", await); + // event should be there + NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get(); + assertNotNull(nodeAddedEvent); + // but action did not complete yet so the event is still enqueued + assertFalse(triggerFired.get()); + actionStarted = new CountDownLatch(1); + // kill overseer leader + cluster.stopJettySolrRunner(overseerLeaderIndex); + Thread.sleep(5000); + await = actionInterrupted.await(3, TimeUnit.SECONDS); + assertTrue("action wasn't interrupted", await); + // new overseer leader should be elected and run triggers + newNode = cluster.startJettySolrRunner(); + // it should fire again but not complete yet + await = actionStarted.await(60, TimeUnit.SECONDS); + TriggerEvent replayedEvent = eventRef.get(); + assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null); + assertTrue(replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null); + await = actionCompleted.await(10, TimeUnit.SECONDS); + assertTrue(triggerFired.get()); + } + + @Test + public void testEventFromRestoredState() throws Exception { + CloudSolrClient solrClient = cluster.getSolrClient(); + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : 'node_added_trigger'," + + "'event' : 'nodeAdded'," + + "'waitFor' : '10s'," + + "'enabled' : true," + + "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" + + "}}"; + SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand); + NamedList response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + if (!actionInitCalled.await(10, TimeUnit.SECONDS)) { + fail("The TriggerAction should have been created by now"); + } + + NamedList overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus()); + String overseerLeader = (String) overSeerStatus.get("leader"); + int overseerLeaderIndex = 0; + for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) { + JettySolrRunner jetty = cluster.getJettySolrRunner(i); + if (jetty.getNodeName().equals(overseerLeader)) { + overseerLeaderIndex = i; + break; + } + } + + JettySolrRunner newNode = cluster.startJettySolrRunner(); + boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS); + assertTrue("The trigger did not fire at all", await); + assertTrue(triggerFired.get()); + // reset + triggerFired.set(false); + triggerFiredLatch = new CountDownLatch(1); + NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get(); + assertNotNull(nodeAddedEvent); + assertEquals("The node added trigger was fired but for a different node", + newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME)); + // add a second node - state of the trigger will change but it won't fire for waitFor sec. + JettySolrRunner newNode2 = cluster.startJettySolrRunner(); + Thread.sleep(10000); + // kill overseer leader + cluster.stopJettySolrRunner(overseerLeaderIndex); + await = triggerFiredLatch.await(20, TimeUnit.SECONDS); + assertTrue("The trigger did not fire at all", await); + assertTrue(triggerFired.get()); + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 0bcd9cafbe6..f23b0b562d1 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -92,6 +92,8 @@ public class ZkStateReader implements Closeable { public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead"; public static final String SOLR_SECURITY_CONF_PATH = "/security.json"; public static final String SOLR_AUTOSCALING_CONF_PATH = "/autoscaling.json"; + public static final String SOLR_AUTOSCALING_EVENTS_PATH = "/autoscaling/events"; + public static final String SOLR_AUTOSCALING_TRIGGER_STATE_PATH = "/autoscaling/triggerState"; public static final String REPLICATION_FACTOR = "replicationFactor"; public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode"; diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java index 7e92629b564..cf83dee7b4c 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java @@ -31,6 +31,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -53,33 +55,59 @@ public class Utils { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static Map getDeepCopy(Map map, int maxDepth) { - return getDeepCopy(map, maxDepth, true); + return getDeepCopy(map, maxDepth, true, false); } public static Map getDeepCopy(Map map, int maxDepth, boolean mutable) { + return getDeepCopy(map, maxDepth, mutable, false); + } + + public static Map getDeepCopy(Map map, int maxDepth, boolean mutable, boolean sorted) { if(map == null) return null; if (maxDepth < 1) return map; - Map copy = new LinkedHashMap(); + Map copy; + if (sorted) { + copy = new TreeMap(); + } else { + copy = new LinkedHashMap(); + } for (Object o : map.entrySet()) { Map.Entry e = (Map.Entry) o; - copy.put(e.getKey(), makeDeepCopy(e.getValue(),maxDepth, mutable)); + copy.put(e.getKey(), makeDeepCopy(e.getValue(),maxDepth, mutable, sorted)); } return mutable ? copy : Collections.unmodifiableMap(copy); } - private static Object makeDeepCopy(Object v, int maxDepth, boolean mutable) { - if (v instanceof MapWriter && maxDepth > 1) v = ((MapWriter) v).toMap(new LinkedHashMap<>()); - else if (v instanceof IteratorWriter && maxDepth > 1) v = ((IteratorWriter) v).toList(new ArrayList<>()); + private static Object makeDeepCopy(Object v, int maxDepth, boolean mutable, boolean sorted) { + if (v instanceof MapWriter && maxDepth > 1) { + v = ((MapWriter) v).toMap(new LinkedHashMap<>()); + } else if (v instanceof IteratorWriter && maxDepth > 1) { + v = ((IteratorWriter) v).toList(new ArrayList<>()); + if (sorted) { + Collections.sort((List)v); + } + } - if (v instanceof Map) v = getDeepCopy((Map) v, maxDepth - 1, mutable); - else if (v instanceof Collection) v = getDeepCopy((Collection) v, maxDepth - 1, mutable); + if (v instanceof Map) { + v = getDeepCopy((Map) v, maxDepth - 1, mutable, sorted); + } else if (v instanceof Collection) { + v = getDeepCopy((Collection) v, maxDepth - 1, mutable, sorted); + } return v; } public static Collection getDeepCopy(Collection c, int maxDepth, boolean mutable) { + return getDeepCopy(c, maxDepth, mutable, false); + } + + public static Collection getDeepCopy(Collection c, int maxDepth, boolean mutable, boolean sorted) { if (c == null || maxDepth < 1) return c; - Collection result = c instanceof Set ? new HashSet() : new ArrayList(); - for (Object o : c) result.add(makeDeepCopy(o, maxDepth, mutable)); + Collection result = c instanceof Set ? + ( sorted? new TreeSet() : new HashSet()) : new ArrayList(); + for (Object o : c) result.add(makeDeepCopy(o, maxDepth, mutable, sorted)); + if (sorted && (result instanceof List)) { + Collections.sort((List)result); + } return mutable ? result : result instanceof Set ? unmodifiableSet((Set) result) : unmodifiableList((List) result); }