mirror of https://github.com/apache/lucene.git
Merge remote-tracking branch 'origin/feature/autoscaling' into feature/autoscaling
This commit is contained in:
commit
a0cd8decc6
|
@ -20,6 +20,7 @@ import java.lang.invoke.MethodHandles;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.util.TimeSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -31,41 +32,29 @@ public class ActionThrottle {
|
|||
private volatile Long minMsBetweenActions;
|
||||
|
||||
private final String name;
|
||||
private final TimeSource timeSource;
|
||||
|
||||
private final NanoTimeSource nanoTimeSource;
|
||||
|
||||
public interface NanoTimeSource {
|
||||
long getTime();
|
||||
}
|
||||
|
||||
private static class DefaultNanoTimeSource implements NanoTimeSource {
|
||||
@Override
|
||||
public long getTime() {
|
||||
return System.nanoTime();
|
||||
}
|
||||
}
|
||||
|
||||
public ActionThrottle(String name, long minMsBetweenActions) {
|
||||
this.name = name;
|
||||
this.minMsBetweenActions = minMsBetweenActions;
|
||||
this.nanoTimeSource = new DefaultNanoTimeSource();
|
||||
this.timeSource = TimeSource.NANO_TIME;
|
||||
}
|
||||
|
||||
public ActionThrottle(String name, long minMsBetweenActions, NanoTimeSource nanoTimeSource) {
|
||||
public ActionThrottle(String name, long minMsBetweenActions, TimeSource timeSource) {
|
||||
this.name = name;
|
||||
this.minMsBetweenActions = minMsBetweenActions;
|
||||
this.nanoTimeSource = nanoTimeSource;
|
||||
this.timeSource = timeSource;
|
||||
}
|
||||
|
||||
public void markAttemptingAction() {
|
||||
lastActionStartedAt = nanoTimeSource.getTime();
|
||||
lastActionStartedAt = timeSource.getTime();
|
||||
}
|
||||
|
||||
public void minimumWaitBetweenActions() {
|
||||
if (lastActionStartedAt == null) {
|
||||
return;
|
||||
}
|
||||
long diff = nanoTimeSource.getTime() - lastActionStartedAt;
|
||||
long diff = timeSource.getTime() - lastActionStartedAt;
|
||||
int diffMs = (int) TimeUnit.MILLISECONDS.convert(diff, TimeUnit.NANOSECONDS);
|
||||
long minNsBetweenActions = TimeUnit.NANOSECONDS.convert(minMsBetweenActions, TimeUnit.MILLISECONDS);
|
||||
log.info("The last {} attempt started {}ms ago.", name, diffMs);
|
||||
|
|
|
@ -664,6 +664,8 @@ public class ZkController {
|
|||
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
|
||||
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
|
||||
cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
|
||||
cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH, zkClient);
|
||||
cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, zkClient);
|
||||
byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8);
|
||||
cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
|
||||
cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
|
||||
|
|
|
@ -48,24 +48,15 @@ public class AutoScaling {
|
|||
AFTER_ACTION
|
||||
}
|
||||
|
||||
public static interface TriggerEvent<T extends Trigger> {
|
||||
public T getSource();
|
||||
|
||||
public long getEventNanoTime();
|
||||
|
||||
public void setContext(Map<String, Object> context);
|
||||
|
||||
public Map<String, Object> getContext();
|
||||
}
|
||||
|
||||
public static interface TriggerListener<E extends TriggerEvent<? extends Trigger>> {
|
||||
public interface TriggerListener {
|
||||
/**
|
||||
* This method is executed when a trigger is ready to fire.
|
||||
*
|
||||
* @param event a subclass of {@link TriggerEvent}
|
||||
* @return true if the listener was ready to perform actions on the event, false otherwise.
|
||||
* @return true if the listener was ready to perform actions on the event, false
|
||||
* otherwise. If false was returned then callers should assume the event was discarded.
|
||||
*/
|
||||
public boolean triggerFired(E event);
|
||||
boolean triggerFired(TriggerEvent event);
|
||||
}
|
||||
|
||||
public static class HttpCallbackListener implements TriggerListener {
|
||||
|
@ -78,7 +69,7 @@ public class AutoScaling {
|
|||
/**
|
||||
* Interface for a Solr trigger. Each trigger implements Runnable and Closeable interface. A trigger
|
||||
* is scheduled using a {@link java.util.concurrent.ScheduledExecutorService} so it is executed as
|
||||
* per a configured schedule to check whether the trigger is ready to fire. The {@link #setListener(TriggerListener)}
|
||||
* per a configured schedule to check whether the trigger is ready to fire. The {@link Trigger#setListener(TriggerListener)}
|
||||
* method should be used to set a callback listener which is fired by implementation of this class whenever
|
||||
* ready.
|
||||
* <p>
|
||||
|
@ -92,29 +83,47 @@ public class AutoScaling {
|
|||
* with the proper trigger event object. If that method returns false then it should be interpreted to mean
|
||||
* that Solr is not ready to process this trigger event and therefore we should retain the state and fire
|
||||
* at the next invocation of the run() method.
|
||||
*
|
||||
* @param <E> the {@link TriggerEvent} which is handled by this Trigger
|
||||
*/
|
||||
public static interface Trigger<E extends TriggerEvent<? extends Trigger>> extends Closeable, Runnable {
|
||||
public String getName();
|
||||
public interface Trigger extends Closeable, Runnable {
|
||||
/**
|
||||
* Trigger name.
|
||||
*/
|
||||
String getName();
|
||||
|
||||
public EventType getEventType();
|
||||
/**
|
||||
* Event type generated by this trigger.
|
||||
*/
|
||||
EventType getEventType();
|
||||
|
||||
public boolean isEnabled();
|
||||
/** Returns true if this trigger is enabled. */
|
||||
boolean isEnabled();
|
||||
|
||||
public Map<String, Object> getProperties();
|
||||
/** Trigger properties. */
|
||||
Map<String, Object> getProperties();
|
||||
|
||||
public int getWaitForSecond();
|
||||
/** Number of seconds to wait between fired events ("waitFor" property). */
|
||||
int getWaitForSecond();
|
||||
|
||||
public List<TriggerAction> getActions();
|
||||
/** Actions to execute when event is fired. */
|
||||
List<TriggerAction> getActions();
|
||||
|
||||
public void setListener(TriggerListener<E> listener);
|
||||
/** Set event listener to call when event is fired. */
|
||||
void setListener(TriggerListener listener);
|
||||
|
||||
public TriggerListener<E> getListener();
|
||||
/** Get event listener. */
|
||||
TriggerListener getListener();
|
||||
|
||||
public boolean isClosed();
|
||||
/** Return true when this trigger is closed and cannot be used. */
|
||||
boolean isClosed();
|
||||
|
||||
public void restoreState(Trigger<E> old);
|
||||
/** Set internal state of this trigger from another instance. */
|
||||
void restoreState(Trigger old);
|
||||
|
||||
/** Save internal state of this trigger in ZooKeeper. */
|
||||
void saveState();
|
||||
|
||||
/** Restore internal state of this trigger from ZooKeeper. */
|
||||
void restoreState();
|
||||
|
||||
/**
|
||||
* Called before a trigger is scheduled. Any heavy object creation or initialisation should
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
|
|||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.SuppressForbidden;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.handler.RequestHandlerBase;
|
||||
|
@ -49,6 +48,7 @@ import org.apache.solr.response.SolrQueryResponse;
|
|||
import org.apache.solr.security.AuthorizationContext;
|
||||
import org.apache.solr.security.PermissionNameProvider;
|
||||
import org.apache.solr.util.CommandOperation;
|
||||
import org.apache.solr.util.TimeSource;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -66,6 +66,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
|
|||
protected final CoreContainer container;
|
||||
private final List<Map<String, String>> DEFAULT_ACTIONS = new ArrayList<>(3);
|
||||
private static ImmutableSet<String> singletonCommands = ImmutableSet.of("set-cluster-preferences", "set-cluster-policy");
|
||||
private static final TimeSource timeSource = TimeSource.CURRENT_TIME;
|
||||
|
||||
|
||||
public AutoScalingHandler(CoreContainer container) {
|
||||
|
@ -250,7 +251,6 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
|
|||
rsp.getValues().add("result", "success");
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "currentTimeMillis is used to find the resume time for the trigger")
|
||||
private void handleSuspendTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
|
||||
String triggerName = op.getStr("name");
|
||||
|
||||
|
@ -263,7 +263,8 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
|
|||
if (timeout != null) {
|
||||
try {
|
||||
int timeoutSeconds = parseHumanTime(timeout);
|
||||
resumeTime = new Date(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeoutSeconds, TimeUnit.SECONDS));
|
||||
resumeTime = new Date(TimeUnit.MILLISECONDS.convert(timeSource.getTime(), TimeUnit.NANOSECONDS)
|
||||
+ TimeUnit.MILLISECONDS.convert(timeoutSeconds, TimeUnit.SECONDS));
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid 'timeout' value for suspend trigger: " + triggerName);
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public class ComputePlanAction implements TriggerAction {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void process(AutoScaling.TriggerEvent event) {
|
||||
public void process(TriggerEvent event) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public class ExecutePlanAction implements TriggerAction {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void process(AutoScaling.TriggerEvent event) {
|
||||
public void process(TriggerEvent event) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public class LogPlanAction implements TriggerAction {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void process(AutoScaling.TriggerEvent event) {
|
||||
public void process(TriggerEvent event) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling;
|
|||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -35,23 +36,25 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.util.TimeSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Trigger for the {@link org.apache.solr.cloud.autoscaling.AutoScaling.EventType#NODEADDED} event
|
||||
*/
|
||||
public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.NodeAddedEvent> {
|
||||
public class NodeAddedTrigger extends TriggerBase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final String name;
|
||||
private final Map<String, Object> properties;
|
||||
private final CoreContainer container;
|
||||
private final List<TriggerAction> actions;
|
||||
private final AtomicReference<AutoScaling.TriggerListener<NodeAddedEvent>> listenerRef;
|
||||
private final AtomicReference<AutoScaling.TriggerListener> listenerRef;
|
||||
private final boolean enabled;
|
||||
private final int waitForSecond;
|
||||
private final AutoScaling.EventType eventType;
|
||||
private final TimeSource timeSource;
|
||||
|
||||
private boolean isClosed = false;
|
||||
|
||||
|
@ -61,9 +64,11 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
|||
|
||||
public NodeAddedTrigger(String name, Map<String, Object> properties,
|
||||
CoreContainer container) {
|
||||
super(container.getZkController().getZkClient());
|
||||
this.name = name;
|
||||
this.properties = properties;
|
||||
this.container = container;
|
||||
this.timeSource = TimeSource.CURRENT_TIME;
|
||||
this.listenerRef = new AtomicReference<>();
|
||||
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
|
||||
if (o != null && !o.isEmpty()) {
|
||||
|
@ -75,6 +80,8 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
|||
} else {
|
||||
actions = Collections.emptyList();
|
||||
}
|
||||
lastLiveNodes = new HashSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes());
|
||||
log.debug("Initial livenodes: {}", lastLiveNodes);
|
||||
this.enabled = (boolean) properties.getOrDefault("enabled", true);
|
||||
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
|
||||
this.eventType = AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
|
||||
|
@ -93,12 +100,12 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setListener(AutoScaling.TriggerListener<NodeAddedEvent> listener) {
|
||||
public void setListener(AutoScaling.TriggerListener listener) {
|
||||
listenerRef.set(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoScaling.TriggerListener<NodeAddedEvent> getListener() {
|
||||
public AutoScaling.TriggerListener getListener() {
|
||||
return listenerRef.get();
|
||||
}
|
||||
|
||||
|
@ -156,7 +163,7 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
|||
}
|
||||
|
||||
@Override
|
||||
public void restoreState(AutoScaling.Trigger<NodeAddedEvent> old) {
|
||||
public void restoreState(AutoScaling.Trigger old) {
|
||||
assert old.isClosed();
|
||||
if (old instanceof NodeAddedTrigger) {
|
||||
NodeAddedTrigger that = (NodeAddedTrigger) old;
|
||||
|
@ -169,6 +176,28 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> getState() {
|
||||
Map<String,Object> state = new HashMap<>();
|
||||
state.put("lastLiveNodes", lastLiveNodes);
|
||||
state.put("nodeNameVsTimeAdded", nodeNameVsTimeAdded);
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setState(Map<String, Object> state) {
|
||||
this.lastLiveNodes.clear();
|
||||
this.nodeNameVsTimeAdded.clear();
|
||||
Collection<String> lastLiveNodes = (Collection<String>)state.get("lastLiveNodes");
|
||||
if (lastLiveNodes != null) {
|
||||
this.lastLiveNodes.addAll(lastLiveNodes);
|
||||
}
|
||||
Map<String,Long> nodeNameVsTimeAdded = (Map<String,Long>)state.get("nodeNameVsTimeAdded");
|
||||
if (nodeNameVsTimeAdded != null) {
|
||||
this.nodeNameVsTimeAdded.putAll(nodeNameVsTimeAdded);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
@ -183,10 +212,6 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
|||
ZkStateReader reader = container.getZkController().getZkStateReader();
|
||||
Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
|
||||
log.debug("Found livenodes: {}", newLiveNodes);
|
||||
if (lastLiveNodes == null) {
|
||||
lastLiveNodes = newLiveNodes;
|
||||
return;
|
||||
}
|
||||
|
||||
// have any nodes that we were tracking been removed from the cluster?
|
||||
// if so, remove them from the tracking map
|
||||
|
@ -197,22 +222,22 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
|||
Set<String> copyOfNew = new HashSet<>(newLiveNodes);
|
||||
copyOfNew.removeAll(lastLiveNodes);
|
||||
copyOfNew.forEach(n -> {
|
||||
long nanoTime = System.nanoTime();
|
||||
nodeNameVsTimeAdded.put(n, nanoTime);
|
||||
log.info("Tracking new node: {} at {} nanotime", n, nanoTime);
|
||||
long eventTime = timeSource.getTime();
|
||||
nodeNameVsTimeAdded.put(n, eventTime);
|
||||
log.debug("Tracking new node: {} at time {}", n, eventTime);
|
||||
});
|
||||
|
||||
// has enough time expired to trigger events for a node?
|
||||
for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
|
||||
String nodeName = entry.getKey();
|
||||
Long timeAdded = entry.getValue();
|
||||
long now = System.nanoTime();
|
||||
long now = timeSource.getTime();
|
||||
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
|
||||
// fire!
|
||||
AutoScaling.TriggerListener<NodeAddedEvent> listener = listenerRef.get();
|
||||
AutoScaling.TriggerListener listener = listenerRef.get();
|
||||
if (listener != null) {
|
||||
log.info("NodeAddedTrigger {} firing registered listener for node: {} added at {} nanotime, now: {} nanotime", name, nodeName, timeAdded, now);
|
||||
if (listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName))) {
|
||||
log.debug("NodeAddedTrigger {} firing registered listener for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
|
||||
if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
|
||||
// remove from tracking set only if the fire was accepted
|
||||
trackingKeySet.remove(nodeName);
|
||||
}
|
||||
|
@ -222,7 +247,7 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
|||
}
|
||||
}
|
||||
|
||||
lastLiveNodes = newLiveNodes;
|
||||
lastLiveNodes = new HashSet(newLiveNodes);
|
||||
} catch (RuntimeException e) {
|
||||
log.error("Unexpected exception in NodeAddedTrigger", e);
|
||||
}
|
||||
|
@ -235,45 +260,10 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
|||
}
|
||||
}
|
||||
|
||||
public static class NodeAddedEvent implements AutoScaling.TriggerEvent<NodeAddedTrigger> {
|
||||
private final NodeAddedTrigger source;
|
||||
private final long nodeAddedNanoTime;
|
||||
private final String nodeName;
|
||||
public static class NodeAddedEvent extends TriggerEvent {
|
||||
|
||||
private Map<String, Object> context;
|
||||
|
||||
public NodeAddedEvent(NodeAddedTrigger source, long nodeAddedNanoTime, String nodeAdded) {
|
||||
this.source = source;
|
||||
this.nodeAddedNanoTime = nodeAddedNanoTime;
|
||||
this.nodeName = nodeAdded;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeAddedTrigger getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getEventNanoTime() {
|
||||
return nodeAddedNanoTime;
|
||||
}
|
||||
|
||||
public String getNodeName() {
|
||||
return nodeName;
|
||||
}
|
||||
|
||||
public AutoScaling.EventType getType() {
|
||||
return source.getEventType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContext(Map<String, Object> context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getContext() {
|
||||
return context;
|
||||
public NodeAddedEvent(AutoScaling.EventType eventType, String source, long nodeAddedTime, String nodeAdded) {
|
||||
super(eventType, source, nodeAddedTime, Collections.singletonMap(NODE_NAME, nodeAdded));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling;
|
|||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -35,23 +36,25 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.util.TimeSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Trigger for the {@link AutoScaling.EventType#NODELOST} event
|
||||
*/
|
||||
public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.NodeLostEvent> {
|
||||
public class NodeLostTrigger extends TriggerBase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final String name;
|
||||
private final Map<String, Object> properties;
|
||||
private final CoreContainer container;
|
||||
private final List<TriggerAction> actions;
|
||||
private final AtomicReference<AutoScaling.TriggerListener<NodeLostEvent>> listenerRef;
|
||||
private final AtomicReference<AutoScaling.TriggerListener> listenerRef;
|
||||
private final boolean enabled;
|
||||
private final int waitForSecond;
|
||||
private final AutoScaling.EventType eventType;
|
||||
private final TimeSource timeSource;
|
||||
|
||||
private boolean isClosed = false;
|
||||
|
||||
|
@ -61,9 +64,11 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
|
|||
|
||||
public NodeLostTrigger(String name, Map<String, Object> properties,
|
||||
CoreContainer container) {
|
||||
super(container.getZkController().getZkClient());
|
||||
this.name = name;
|
||||
this.properties = properties;
|
||||
this.container = container;
|
||||
this.timeSource = TimeSource.CURRENT_TIME;
|
||||
this.listenerRef = new AtomicReference<>();
|
||||
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
|
||||
if (o != null && !o.isEmpty()) {
|
||||
|
@ -75,7 +80,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
|
|||
} else {
|
||||
actions = Collections.emptyList();
|
||||
}
|
||||
lastLiveNodes = container.getZkController().getZkStateReader().getClusterState().getLiveNodes();
|
||||
lastLiveNodes = new HashSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes());
|
||||
log.debug("Initial livenodes: {}", lastLiveNodes);
|
||||
this.enabled = (boolean) properties.getOrDefault("enabled", true);
|
||||
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
|
||||
|
@ -94,12 +99,12 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setListener(AutoScaling.TriggerListener<NodeLostEvent> listener) {
|
||||
public void setListener(AutoScaling.TriggerListener listener) {
|
||||
listenerRef.set(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoScaling.TriggerListener<NodeLostEvent> getListener() {
|
||||
public AutoScaling.TriggerListener getListener() {
|
||||
return listenerRef.get();
|
||||
}
|
||||
|
||||
|
@ -157,7 +162,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
|
|||
}
|
||||
|
||||
@Override
|
||||
public void restoreState(AutoScaling.Trigger<NodeLostEvent> old) {
|
||||
public void restoreState(AutoScaling.Trigger old) {
|
||||
assert old.isClosed();
|
||||
if (old instanceof NodeLostTrigger) {
|
||||
NodeLostTrigger that = (NodeLostTrigger) old;
|
||||
|
@ -170,6 +175,28 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> getState() {
|
||||
Map<String,Object> state = new HashMap<>();
|
||||
state.put("lastLiveNodes", lastLiveNodes);
|
||||
state.put("nodeNameVsTimeRemoved", nodeNameVsTimeRemoved);
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setState(Map<String, Object> state) {
|
||||
this.lastLiveNodes.clear();
|
||||
this.nodeNameVsTimeRemoved.clear();
|
||||
Collection<String> lastLiveNodes = (Collection<String>)state.get("lastLiveNodes");
|
||||
if (lastLiveNodes != null) {
|
||||
this.lastLiveNodes.addAll(lastLiveNodes);
|
||||
}
|
||||
Map<String,Long> nodeNameVsTimeRemoved = (Map<String,Long>)state.get("nodeNameVsTimeRemoved");
|
||||
if (nodeNameVsTimeRemoved != null) {
|
||||
this.nodeNameVsTimeRemoved.putAll(nodeNameVsTimeRemoved);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
@ -194,20 +221,20 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
|
|||
Set<String> copyOfLastLiveNodes = new HashSet<>(lastLiveNodes);
|
||||
copyOfLastLiveNodes.removeAll(newLiveNodes);
|
||||
copyOfLastLiveNodes.forEach(n -> {
|
||||
log.info("Tracking lost node: {}", n);
|
||||
nodeNameVsTimeRemoved.put(n, System.nanoTime());
|
||||
log.debug("Tracking lost node: {}", n);
|
||||
nodeNameVsTimeRemoved.put(n, timeSource.getTime());
|
||||
});
|
||||
|
||||
// has enough time expired to trigger events for a node?
|
||||
for (Map.Entry<String, Long> entry : nodeNameVsTimeRemoved.entrySet()) {
|
||||
String nodeName = entry.getKey();
|
||||
Long timeRemoved = entry.getValue();
|
||||
if (TimeUnit.SECONDS.convert(System.nanoTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
|
||||
if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
|
||||
// fire!
|
||||
AutoScaling.TriggerListener<NodeLostEvent> listener = listenerRef.get();
|
||||
AutoScaling.TriggerListener listener = listenerRef.get();
|
||||
if (listener != null) {
|
||||
log.info("NodeLostTrigger firing registered listener");
|
||||
if (listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName))) {
|
||||
log.debug("NodeLostTrigger firing registered listener");
|
||||
if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) {
|
||||
trackingKeySet.remove(nodeName);
|
||||
}
|
||||
} else {
|
||||
|
@ -216,7 +243,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
|
|||
}
|
||||
}
|
||||
|
||||
lastLiveNodes = newLiveNodes;
|
||||
lastLiveNodes = new HashSet<>(newLiveNodes);
|
||||
} catch (RuntimeException e) {
|
||||
log.error("Unexpected exception in NodeLostTrigger", e);
|
||||
}
|
||||
|
@ -229,45 +256,10 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
|
|||
}
|
||||
}
|
||||
|
||||
public static class NodeLostEvent implements AutoScaling.TriggerEvent<NodeLostTrigger> {
|
||||
private final NodeLostTrigger source;
|
||||
private final long nodeLostNanoTime;
|
||||
private final String nodeName;
|
||||
public static class NodeLostEvent extends TriggerEvent {
|
||||
|
||||
private Map<String, Object> context;
|
||||
|
||||
public NodeLostEvent(NodeLostTrigger source, long nodeLostNanoTime, String nodeRemoved) {
|
||||
this.source = source;
|
||||
this.nodeLostNanoTime = nodeLostNanoTime;
|
||||
this.nodeName = nodeRemoved;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeLostTrigger getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getEventNanoTime() {
|
||||
return nodeLostNanoTime;
|
||||
}
|
||||
|
||||
public String getNodeName() {
|
||||
return nodeName;
|
||||
}
|
||||
|
||||
public AutoScaling.EventType getType() {
|
||||
return source.getEventType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContext(Map<String, Object> context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getContext() {
|
||||
return context;
|
||||
public NodeLostEvent(AutoScaling.EventType eventType, String source, long nodeLostTime, String nodeRemoved) {
|
||||
super(eventType, source, nodeLostTime, Collections.singletonMap(NODE_NAME, nodeRemoved));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -77,7 +78,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
|
|||
this.zkController = zkController;
|
||||
zkStateReader = zkController.getZkStateReader();
|
||||
zkClient = zkController.getZkClient();
|
||||
scheduledTriggers = new ScheduledTriggers();
|
||||
scheduledTriggers = new ScheduledTriggers(zkClient);
|
||||
triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer());
|
||||
}
|
||||
|
||||
|
|
|
@ -21,11 +21,11 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
@ -35,9 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.solr.cloud.ActionThrottle;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -50,7 +54,7 @@ public class ScheduledTriggers implements Closeable {
|
|||
static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
|
||||
static final int DEFAULT_MIN_MS_BETWEEN_ACTIONS = 5000;
|
||||
|
||||
private final Map<String, ScheduledTrigger> scheduledTriggers = new HashMap<>();
|
||||
private final Map<String, ScheduledTrigger> scheduledTriggers = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Thread pool for scheduling the triggers
|
||||
|
@ -70,7 +74,11 @@ public class ScheduledTriggers implements Closeable {
|
|||
|
||||
private final ActionThrottle actionThrottle;
|
||||
|
||||
public ScheduledTriggers() {
|
||||
private final SolrZkClient zkClient;
|
||||
|
||||
private final Overseer.Stats queueStats;
|
||||
|
||||
public ScheduledTriggers(SolrZkClient zkClient) {
|
||||
// todo make the core pool size configurable
|
||||
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
|
||||
// ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand
|
||||
|
@ -83,6 +91,8 @@ public class ScheduledTriggers implements Closeable {
|
|||
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
|
||||
// todo make the wait time configurable
|
||||
actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
|
||||
this.zkClient = zkClient;
|
||||
queueStats = new Overseer.Stats();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,7 +107,7 @@ public class ScheduledTriggers implements Closeable {
|
|||
if (isClosed) {
|
||||
throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
|
||||
}
|
||||
ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger);
|
||||
ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger, zkClient, queueStats);
|
||||
ScheduledTrigger old = scheduledTriggers.putIfAbsent(newTrigger.getName(), scheduledTrigger);
|
||||
if (old != null) {
|
||||
if (old.trigger.equals(newTrigger)) {
|
||||
|
@ -106,20 +116,34 @@ public class ScheduledTriggers implements Closeable {
|
|||
}
|
||||
IOUtils.closeQuietly(old);
|
||||
newTrigger.restoreState(old.trigger);
|
||||
scheduledTrigger.setReplay(false);
|
||||
scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
|
||||
}
|
||||
newTrigger.setListener(event -> {
|
||||
AutoScaling.Trigger source = event.getSource();
|
||||
ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
|
||||
if (scheduledSource == null) {
|
||||
log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + event.getSource() + " doesn't exist.");
|
||||
return false;
|
||||
}
|
||||
boolean replaying = event.getProperty(TriggerEvent.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEvent.REPLAYING) : false;
|
||||
AutoScaling.Trigger source = scheduledSource.trigger;
|
||||
if (source.isClosed()) {
|
||||
log.warn("Ignoring autoscaling event because the source trigger: " + source + " has already been closed");
|
||||
// we do not want to lose this event just because the trigger were closed, perhaps a replacement will need it
|
||||
log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + source + " has already been closed");
|
||||
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
|
||||
return false;
|
||||
}
|
||||
if (hasPendingActions.compareAndSet(false, true)) {
|
||||
final boolean enqueued;
|
||||
if (replaying) {
|
||||
enqueued = false;
|
||||
} else {
|
||||
enqueued = scheduledTrigger.enqueue(event);
|
||||
}
|
||||
List<TriggerAction> actions = source.getActions();
|
||||
if (actions != null) {
|
||||
actionExecutor.submit(() -> {
|
||||
assert hasPendingActions.get();
|
||||
log.debug("-- processing actions for " + event);
|
||||
try {
|
||||
// let the action executor thread wait instead of the trigger thread so we use the throttle here
|
||||
actionThrottle.minimumWaitBetweenActions();
|
||||
|
@ -132,10 +156,23 @@ public class ScheduledTriggers implements Closeable {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
if (enqueued) {
|
||||
TriggerEvent ev = scheduledTrigger.dequeue();
|
||||
assert ev.getId().equals(event.getId());
|
||||
}
|
||||
} finally {
|
||||
hasPendingActions.set(false);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
if (enqueued) {
|
||||
TriggerEvent ev = scheduledTrigger.dequeue();
|
||||
if (!ev.getId().equals(event.getId())) {
|
||||
throw new RuntimeException("Wrong event dequeued, queue of " + scheduledTrigger.trigger.getName()
|
||||
+ " is broken! Expected event=" + event + " but got " + ev);
|
||||
}
|
||||
}
|
||||
hasPendingActions.set(false);
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
|
@ -148,19 +185,37 @@ public class ScheduledTriggers implements Closeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Removes and stops the trigger with the given name
|
||||
* Removes and stops the trigger with the given name. Also cleans up any leftover
|
||||
* state / events in ZK.
|
||||
*
|
||||
* @param triggerName the name of the trigger to be removed
|
||||
* @throws AlreadyClosedException if this class has already been closed
|
||||
*/
|
||||
public synchronized void remove(String triggerName) {
|
||||
if (isClosed) {
|
||||
throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used any more");
|
||||
}
|
||||
ScheduledTrigger removed = scheduledTriggers.remove(triggerName);
|
||||
IOUtils.closeQuietly(removed);
|
||||
removeTriggerZKData(triggerName);
|
||||
}
|
||||
|
||||
private void removeTriggerZKData(String triggerName) {
|
||||
String statePath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName;
|
||||
String eventsPath = ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName;
|
||||
try {
|
||||
if (zkClient.exists(statePath, true)) {
|
||||
zkClient.delete(statePath, -1, true);
|
||||
}
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
log.warn("Failed to remove state for removed trigger " + statePath, e);
|
||||
}
|
||||
try {
|
||||
if (zkClient.exists(eventsPath, true)) {
|
||||
zkClient.delete(eventsPath, -1, true);
|
||||
}
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
log.warn("Failed to remove events for removed trigger " + eventsPath, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return an unmodifiable set of names of all triggers being managed by this class
|
||||
*/
|
||||
|
@ -178,36 +233,91 @@ public class ScheduledTriggers implements Closeable {
|
|||
}
|
||||
scheduledTriggers.clear();
|
||||
}
|
||||
ExecutorUtil.shutdownAndAwaitTermination(scheduledThreadPoolExecutor);
|
||||
ExecutorUtil.shutdownAndAwaitTermination(actionExecutor);
|
||||
// shutdown and interrupt all running tasks because there's no longer any
|
||||
// guarantee about cluster state
|
||||
scheduledThreadPoolExecutor.shutdownNow();
|
||||
actionExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
private class ScheduledTrigger implements Runnable, Closeable {
|
||||
AutoScaling.Trigger trigger;
|
||||
ScheduledFuture<?> scheduledFuture;
|
||||
TriggerEventQueue queue;
|
||||
boolean replay;
|
||||
volatile boolean isClosed;
|
||||
|
||||
ScheduledTrigger(AutoScaling.Trigger trigger) {
|
||||
ScheduledTrigger(AutoScaling.Trigger trigger, SolrZkClient zkClient, Overseer.Stats stats) {
|
||||
this.trigger = trigger;
|
||||
this.queue = new TriggerEventQueue(zkClient, trigger.getName(), stats);
|
||||
this.replay = true;
|
||||
this.isClosed = false;
|
||||
}
|
||||
|
||||
public void setReplay(boolean replay) {
|
||||
this.replay = replay;
|
||||
}
|
||||
|
||||
public boolean enqueue(TriggerEvent event) {
|
||||
if (isClosed) {
|
||||
throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
|
||||
}
|
||||
return queue.offerEvent(event);
|
||||
}
|
||||
|
||||
public TriggerEvent dequeue() {
|
||||
if (isClosed) {
|
||||
throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
|
||||
}
|
||||
TriggerEvent event = queue.pollEvent();
|
||||
return event;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (isClosed) {
|
||||
throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
|
||||
}
|
||||
// fire a trigger only if an action is not pending
|
||||
// note this is not fool proof e.g. it does not prevent an action being executed while a trigger
|
||||
// is still executing. There is additional protection against that scenario in the event listener.
|
||||
if (!hasPendingActions.get()) {
|
||||
// replay accumulated events on first run, if any
|
||||
if (replay) {
|
||||
TriggerEvent event;
|
||||
// peek first without removing - we may crash before calling the listener
|
||||
while ((event = queue.peekEvent()) != null) {
|
||||
// override REPLAYING=true
|
||||
event.getProperties().put(TriggerEvent.REPLAYING, true);
|
||||
if (! trigger.getListener().triggerFired(event)) {
|
||||
log.error("Failed to re-play event, discarding: " + event);
|
||||
}
|
||||
queue.pollEvent(); // always remove it from queue
|
||||
}
|
||||
// now restore saved state to possibly generate new events from old state on the first run
|
||||
try {
|
||||
trigger.restoreState();
|
||||
} catch (Exception e) {
|
||||
// log but don't throw - see below
|
||||
log.error("Error restoring trigger state " + trigger.getName(), e);
|
||||
}
|
||||
replay = false;
|
||||
}
|
||||
try {
|
||||
trigger.run();
|
||||
} catch (Exception e) {
|
||||
// log but do not propagate exception because an exception thrown from a scheduled operation
|
||||
// will suppress future executions
|
||||
log.error("Unexpected execution from trigger: " + trigger.getName(), e);
|
||||
} finally {
|
||||
// checkpoint after each run
|
||||
trigger.saveState();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
isClosed = true;
|
||||
if (scheduledFuture != null) {
|
||||
scheduledFuture.cancel(true);
|
||||
}
|
||||
|
|
|
@ -30,5 +30,5 @@ public interface TriggerAction extends MapInitializedPlugin, Closeable {
|
|||
|
||||
public String getClassName();
|
||||
|
||||
public void process(AutoScaling.TriggerEvent event);
|
||||
public void process(TriggerEvent event);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Base class for {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} implementations.
|
||||
* It handles state snapshot / restore in ZK.
|
||||
*/
|
||||
public abstract class TriggerBase implements AutoScaling.Trigger {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
protected SolrZkClient zkClient;
|
||||
protected Map<String,Object> lastState;
|
||||
|
||||
|
||||
protected TriggerBase(SolrZkClient zkClient) {
|
||||
this.zkClient = zkClient;
|
||||
try {
|
||||
zkClient.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, false, true);
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
LOG.warn("Exception checking ZK path " + ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare and return internal state of this trigger in a format suitable for persisting in ZK.
|
||||
* @return map of internal state properties. Note: values must be supported by {@link Utils#toJSON(Object)}.
|
||||
*/
|
||||
protected abstract Map<String,Object> getState();
|
||||
|
||||
/**
|
||||
* Restore internal state of this trigger from properties retrieved from ZK.
|
||||
* @param state never null but may be empty.
|
||||
*/
|
||||
protected abstract void setState(Map<String,Object> state);
|
||||
|
||||
@Override
|
||||
public void saveState() {
|
||||
Map<String,Object> state = Utils.getDeepCopy(getState(), 10, false, true);
|
||||
if (lastState != null && lastState.equals(state)) {
|
||||
// skip saving if identical
|
||||
return;
|
||||
}
|
||||
byte[] data = Utils.toJSON(state);
|
||||
String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName();
|
||||
try {
|
||||
if (zkClient.exists(path, true)) {
|
||||
// update
|
||||
zkClient.setData(path, data, -1, true);
|
||||
} else {
|
||||
// create
|
||||
zkClient.create(path, data, CreateMode.PERSISTENT, true);
|
||||
}
|
||||
lastState = state;
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
LOG.warn("Exception updating trigger state '" + path + "'", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreState() {
|
||||
byte[] data = null;
|
||||
String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName();
|
||||
try {
|
||||
if (zkClient.exists(path, true)) {
|
||||
data = zkClient.getData(path, null, new Stat(), true);
|
||||
}
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
LOG.warn("Exception getting trigger state '" + path + "'", e);
|
||||
}
|
||||
if (data != null) {
|
||||
Map<String, Object> state = (Map<String, Object>)Utils.fromJSON(data);
|
||||
// make sure lastState is sorted
|
||||
state = Utils.getDeepCopy(state, 10, false, true);;
|
||||
setState(state);
|
||||
lastState = state;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,153 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.util.IdUtils;
|
||||
|
||||
/**
|
||||
* Trigger event.
|
||||
*/
|
||||
public class TriggerEvent implements MapWriter {
|
||||
public static final String REPLAYING = "replaying";
|
||||
public static final String NODE_NAME = "nodeName";
|
||||
|
||||
protected final String id;
|
||||
protected final String source;
|
||||
protected final long eventTime;
|
||||
protected final AutoScaling.EventType eventType;
|
||||
protected final Map<String, Object> properties = new HashMap<>();
|
||||
|
||||
public TriggerEvent(AutoScaling.EventType eventType, String source, long eventTime,
|
||||
Map<String, Object> properties) {
|
||||
this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties);
|
||||
}
|
||||
|
||||
public TriggerEvent(String id, AutoScaling.EventType eventType, String source, long eventTime,
|
||||
Map<String, Object> properties) {
|
||||
this.id = id;
|
||||
this.eventType = eventType;
|
||||
this.source = source;
|
||||
this.eventTime = eventTime;
|
||||
if (properties != null) {
|
||||
this.properties.putAll(properties);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unique event id.
|
||||
*/
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Name of the trigger that fired the event.
|
||||
*/
|
||||
public String getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timestamp of the actual event, in nanoseconds.
|
||||
* NOTE: this is NOT the timestamp when the event was fired - events may be fired
|
||||
* much later than the actual condition that generated the event, due to the "waitFor" limit.
|
||||
*/
|
||||
public long getEventTime() {
|
||||
return eventTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get event properties (modifiable).
|
||||
*/
|
||||
public Map<String, Object> getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a named event property or null if missing.
|
||||
*/
|
||||
public Object getProperty(String name) {
|
||||
return properties.get(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Event type.
|
||||
*/
|
||||
public AutoScaling.EventType getEventType() {
|
||||
return eventType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set event properties.
|
||||
*
|
||||
* @param properties may be null. A shallow copy of this parameter is used.
|
||||
*/
|
||||
public void setProperties(Map<String, Object> properties) {
|
||||
this.properties.clear();
|
||||
if (properties != null) {
|
||||
this.properties.putAll(properties);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
ew.put("id", id);
|
||||
ew.put("source", source);
|
||||
ew.put("eventTime", eventTime);
|
||||
ew.put("eventType", eventType.toString());
|
||||
ew.put("properties", properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
TriggerEvent that = (TriggerEvent) o;
|
||||
|
||||
if (eventTime != that.eventTime) return false;
|
||||
if (!id.equals(that.id)) return false;
|
||||
if (!source.equals(that.source)) return false;
|
||||
if (eventType != that.eventType) return false;
|
||||
return properties.equals(that.properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = id.hashCode();
|
||||
result = 31 * result + source.hashCode();
|
||||
result = 31 * result + (int) (eventTime ^ (eventTime >>> 32));
|
||||
result = 31 * result + eventType.hashCode();
|
||||
result = 31 * result + properties.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.getClass().getSimpleName() + "{" +
|
||||
"id='" + id + '\'' +
|
||||
", source='" + source + '\'' +
|
||||
", eventTime=" + eventTime +
|
||||
", properties=" + properties +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.cloud.DistributedQueue;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.TimeSource;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class TriggerEventQueue extends DistributedQueue {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
public static final String ENQUEUE_TIME = "_enqueue_time_";
|
||||
public static final String DEQUEUE_TIME = "_dequeue_time_";
|
||||
|
||||
private final String triggerName;
|
||||
private final TimeSource timeSource;
|
||||
|
||||
public TriggerEventQueue(SolrZkClient zookeeper, String triggerName, Overseer.Stats stats) {
|
||||
super(zookeeper, ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName, stats);
|
||||
this.triggerName = triggerName;
|
||||
this.timeSource = TimeSource.CURRENT_TIME;
|
||||
}
|
||||
|
||||
public boolean offerEvent(TriggerEvent event) {
|
||||
event.getProperties().put(ENQUEUE_TIME, timeSource.getTime());
|
||||
try {
|
||||
byte[] data = Utils.toJSON(event);
|
||||
offer(data);
|
||||
return true;
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
LOG.warn("Exception adding event " + event + " to queue " + triggerName, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public TriggerEvent peekEvent() {
|
||||
byte[] data;
|
||||
try {
|
||||
while ((data = peek()) != null) {
|
||||
if (data.length == 0) {
|
||||
LOG.warn("ignoring empty data...");
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
Map<String, Object> map = (Map<String, Object>) Utils.fromJSON(data);
|
||||
return fromMap(map);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Invalid event data, ignoring: " + new String(data));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
LOG.warn("Exception peeking queue of trigger " + triggerName, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public TriggerEvent pollEvent() {
|
||||
byte[] data;
|
||||
try {
|
||||
while ((data = poll()) != null) {
|
||||
if (data.length == 0) {
|
||||
LOG.warn("ignoring empty data...");
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
Map<String, Object> map = (Map<String, Object>) Utils.fromJSON(data);
|
||||
return fromMap(map);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Invalid event data, ignoring: " + new String(data));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
LOG.warn("Exception polling queue of trigger " + triggerName, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private TriggerEvent fromMap(Map<String, Object> map) {
|
||||
String id = (String)map.get("id");
|
||||
String source = (String)map.get("source");
|
||||
long eventTime = ((Number)map.get("eventTime")).longValue();
|
||||
AutoScaling.EventType eventType = AutoScaling.EventType.valueOf((String)map.get("eventType"));
|
||||
Map<String, Object> properties = (Map<String, Object>)map.get("properties");
|
||||
TriggerEvent res = new TriggerEvent(id, eventType, source, eventTime, properties);
|
||||
res.getProperties().put(DEQUEUE_TIME, timeSource.getTime());
|
||||
return res;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package org.apache.solr.util;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
|
||||
/**
|
||||
* Helper class for generating unique ID-s.
|
||||
*/
|
||||
public class IdUtils {
|
||||
|
||||
/**
|
||||
* Generate a short random id (see {@link StringHelper#randomId()}).
|
||||
*/
|
||||
public static final String randomId() {
|
||||
return StringHelper.idToString(StringHelper.randomId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a random id with a timestamp, in the format:
|
||||
* <code>hex(timestamp) + 'T' + randomId</code>. This method
|
||||
* uses {@link TimeSource#CURRENT_TIME} for timestamp values.
|
||||
*/
|
||||
public static final String timeRandomId() {
|
||||
return timeRandomId(TimeUnit.MILLISECONDS.convert(TimeSource.CURRENT_TIME.getTime(), TimeUnit.NANOSECONDS));
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a random id with a timestamp, in the format:
|
||||
* <code>hex(timestamp) + 'T' + randomId</code>.
|
||||
* @param time value representing timestamp
|
||||
*/
|
||||
public static final String timeRandomId(long time) {
|
||||
StringBuilder sb = new StringBuilder(Long.toHexString(time));
|
||||
sb.append('T');
|
||||
sb.append(randomId());
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.util;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.common.util.SuppressForbidden;
|
||||
|
||||
/**
|
||||
* Source of timestamps.
|
||||
*/
|
||||
public abstract class TimeSource {
|
||||
|
||||
/** Implementation that uses {@link System#currentTimeMillis()}. */
|
||||
public static final class CurrentTimeSource extends TimeSource {
|
||||
|
||||
@Override
|
||||
@SuppressForbidden(reason = "Needed to provide timestamps based on currentTimeMillis.")
|
||||
public long getTime() {
|
||||
return TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
/** Implementation that uses {@link System#nanoTime()}. */
|
||||
public static final class NanoTimeSource extends TimeSource {
|
||||
|
||||
@Override
|
||||
public long getTime() {
|
||||
return System.nanoTime();
|
||||
}
|
||||
}
|
||||
|
||||
/** This instance uses {@link CurrentTimeSource} for generating timestamps. */
|
||||
public static final TimeSource CURRENT_TIME = new CurrentTimeSource();
|
||||
|
||||
/** This instance uses {@link NanoTimeSource} for generating timestamps. */
|
||||
public static final TimeSource NANO_TIME = new NanoTimeSource();
|
||||
|
||||
/**
|
||||
* Return a timestamp, in nanosecond unit.
|
||||
*/
|
||||
public abstract long getTime();
|
||||
}
|
|
@ -21,12 +21,12 @@ import java.util.List;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.cloud.ActionThrottle.NanoTimeSource;
|
||||
import org.apache.solr.util.TimeSource;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ActionThrottleTest extends SolrTestCaseJ4 {
|
||||
|
||||
static class TestNanoTimeSource implements NanoTimeSource {
|
||||
static class TestNanoTimeSource extends TimeSource {
|
||||
|
||||
private List<Long> returnValues;
|
||||
private int index = 0;
|
||||
|
@ -41,35 +41,38 @@ public class ActionThrottleTest extends SolrTestCaseJ4 {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
// use the same time source as ActionThrottle
|
||||
private static final TimeSource timeSource = TimeSource.NANO_TIME;
|
||||
|
||||
@Test
|
||||
public void testBasics() throws Exception {
|
||||
|
||||
ActionThrottle at = new ActionThrottle("test", 1000);
|
||||
long start = System.nanoTime();
|
||||
long start = timeSource.getTime();
|
||||
|
||||
at.minimumWaitBetweenActions();
|
||||
|
||||
// should be no wait
|
||||
assertTrue(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS) < 1000);
|
||||
assertTrue(TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS) < 1000);
|
||||
at.markAttemptingAction();
|
||||
|
||||
if (random().nextBoolean()) Thread.sleep(100);
|
||||
|
||||
at.minimumWaitBetweenActions();
|
||||
|
||||
long elaspsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
|
||||
long elaspsedTime = TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS);
|
||||
|
||||
assertTrue(elaspsedTime + "ms", elaspsedTime >= 995);
|
||||
|
||||
start = System.nanoTime();
|
||||
start = timeSource.getTime();
|
||||
|
||||
at.markAttemptingAction();
|
||||
at.minimumWaitBetweenActions();
|
||||
|
||||
Thread.sleep(random().nextInt(1000));
|
||||
|
||||
elaspsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
|
||||
elaspsedTime = TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS);
|
||||
|
||||
assertTrue(elaspsedTime + "ms", elaspsedTime >= 995);
|
||||
}
|
||||
|
@ -78,13 +81,13 @@ public class ActionThrottleTest extends SolrTestCaseJ4 {
|
|||
public void testAZeroNanoTimeReturnInWait() throws Exception {
|
||||
|
||||
ActionThrottle at = new ActionThrottle("test", 1000, new TestNanoTimeSource(Arrays.asList(new Long[]{0L, 10L})));
|
||||
long start = System.nanoTime();
|
||||
long start = timeSource.getTime();
|
||||
|
||||
at.markAttemptingAction();
|
||||
|
||||
at.minimumWaitBetweenActions();
|
||||
|
||||
long elaspsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
|
||||
long elaspsedTime = TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS);
|
||||
|
||||
assertTrue(elaspsedTime + "ms", elaspsedTime >= 995);
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.util.TimeSource;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -42,11 +43,13 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
|||
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
|
||||
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
|
||||
|
||||
private AutoScaling.TriggerListener<NodeAddedTrigger.NodeAddedEvent> noFirstRunListener = event -> {
|
||||
private AutoScaling.TriggerListener noFirstRunListener = event -> {
|
||||
fail("Did not expect the listener to fire on first run!");
|
||||
return true;
|
||||
};
|
||||
|
||||
private static final TimeSource timeSource = TimeSource.CURRENT_TIME;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(1)
|
||||
|
@ -73,11 +76,11 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
|||
|
||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||
AtomicBoolean fired = new AtomicBoolean(false);
|
||||
AtomicReference<NodeAddedTrigger.NodeAddedEvent> eventRef = new AtomicReference<>();
|
||||
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
|
||||
trigger.setListener(event -> {
|
||||
if (fired.compareAndSet(false, true)) {
|
||||
eventRef.set(event);
|
||||
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
fail("NodeAddedListener was fired before the configured waitFor period");
|
||||
}
|
||||
} else {
|
||||
|
@ -94,9 +97,9 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
|||
}
|
||||
} while (!fired.get());
|
||||
|
||||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
|
||||
TriggerEvent nodeAddedEvent = eventRef.get();
|
||||
assertNotNull(nodeAddedEvent);
|
||||
assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName());
|
||||
assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
|
||||
}
|
||||
|
||||
// add a new node but remove it before the waitFor period expires
|
||||
|
@ -111,7 +114,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
|||
AtomicBoolean fired = new AtomicBoolean(false);
|
||||
trigger.setListener(event -> {
|
||||
if (fired.compareAndSet(false, true)) {
|
||||
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
|
||||
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
|
||||
fail("NodeAddedListener was fired before the configured waitFor period");
|
||||
}
|
||||
} else {
|
||||
|
@ -170,7 +173,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void process(AutoScaling.TriggerEvent event) {
|
||||
public void process(TriggerEvent event) {
|
||||
|
||||
}
|
||||
|
||||
|
@ -246,11 +249,11 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
|||
|
||||
try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
|
||||
AtomicBoolean fired = new AtomicBoolean(false);
|
||||
AtomicReference<NodeAddedTrigger.NodeAddedEvent> eventRef = new AtomicReference<>();
|
||||
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
|
||||
newTrigger.setListener(event -> {
|
||||
if (fired.compareAndSet(false, true)) {
|
||||
eventRef.set(event);
|
||||
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
|
||||
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
|
||||
fail("NodeAddedListener was fired before the configured waitFor period");
|
||||
}
|
||||
} else {
|
||||
|
@ -270,9 +273,9 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
|||
|
||||
// ensure the event was fired
|
||||
assertTrue(fired.get());
|
||||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
|
||||
TriggerEvent nodeAddedEvent = eventRef.get();
|
||||
assertNotNull(nodeAddedEvent);
|
||||
assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName());
|
||||
assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,8 +29,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.util.TimeSource;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -43,11 +43,14 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
|||
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
|
||||
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
|
||||
|
||||
private AutoScaling.TriggerListener<NodeLostTrigger.NodeLostEvent> noFirstRunListener = event -> {
|
||||
private AutoScaling.TriggerListener noFirstRunListener = event -> {
|
||||
fail("Did not expect the listener to fire on first run!");
|
||||
return true;
|
||||
};
|
||||
|
||||
// use the same time source as the trigger
|
||||
private final TimeSource timeSource = TimeSource.CURRENT_TIME;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(5)
|
||||
|
@ -75,11 +78,11 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
|||
cluster.stopJettySolrRunner(1);
|
||||
|
||||
AtomicBoolean fired = new AtomicBoolean(false);
|
||||
AtomicReference<NodeLostTrigger.NodeLostEvent> eventRef = new AtomicReference<>();
|
||||
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
|
||||
trigger.setListener(event -> {
|
||||
if (fired.compareAndSet(false, true)) {
|
||||
eventRef.set(event);
|
||||
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
fail("NodeLostListener was fired before the configured waitFor period");
|
||||
}
|
||||
} else {
|
||||
|
@ -96,9 +99,9 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
|||
}
|
||||
} while (!fired.get());
|
||||
|
||||
NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get();
|
||||
TriggerEvent nodeLostEvent = eventRef.get();
|
||||
assertNotNull(nodeLostEvent);
|
||||
assertEquals("", lostNodeName, nodeLostEvent.getNodeName());
|
||||
assertEquals("", lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
|
||||
|
||||
}
|
||||
|
||||
|
@ -115,7 +118,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
|||
AtomicBoolean fired = new AtomicBoolean(false);
|
||||
trigger.setListener(event -> {
|
||||
if (fired.compareAndSet(false, true)) {
|
||||
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
|
||||
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
|
||||
fail("NodeLostListener was fired before the configured waitFor period");
|
||||
}
|
||||
} else {
|
||||
|
@ -184,7 +187,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void process(AutoScaling.TriggerEvent event) {
|
||||
public void process(TriggerEvent event) {
|
||||
|
||||
}
|
||||
|
||||
|
@ -284,11 +287,11 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
|||
|
||||
try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
|
||||
AtomicBoolean fired = new AtomicBoolean(false);
|
||||
AtomicReference<NodeLostTrigger.NodeLostEvent> eventRef = new AtomicReference<>();
|
||||
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
|
||||
newTrigger.setListener(event -> {
|
||||
if (fired.compareAndSet(false, true)) {
|
||||
eventRef.set(event);
|
||||
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
if (timeSource.getTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
fail("NodeLostListener was fired before the configured waitFor period");
|
||||
}
|
||||
} else {
|
||||
|
@ -306,9 +309,9 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
|||
}
|
||||
} while (!fired.get());
|
||||
|
||||
NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get();
|
||||
TriggerEvent nodeLostEvent = eventRef.get();
|
||||
assertNotNull(nodeLostEvent);
|
||||
assertEquals("", lostNodeName, nodeLostEvent.getNodeName());
|
||||
assertEquals("", lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.solr.common.util.NamedList;
|
|||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.solr.util.TimeSource;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -60,11 +61,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
private static CountDownLatch actionInitCalled;
|
||||
private static CountDownLatch triggerFiredLatch;
|
||||
private static int waitForSeconds = 1;
|
||||
private static CountDownLatch actionStarted;
|
||||
private static CountDownLatch actionInterrupted;
|
||||
private static CountDownLatch actionCompleted;
|
||||
private static AtomicBoolean triggerFired;
|
||||
private static AtomicReference<AutoScaling.TriggerEvent> eventRef;
|
||||
private static AtomicReference<TriggerEvent> eventRef;
|
||||
|
||||
private String path;
|
||||
|
||||
// use the same time source as triggers use
|
||||
private static final TimeSource timeSource = TimeSource.CURRENT_TIME;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(2)
|
||||
|
@ -72,6 +79,22 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
.configure();
|
||||
}
|
||||
|
||||
private static CountDownLatch getTriggerFiredLatch() {
|
||||
return triggerFiredLatch;
|
||||
}
|
||||
|
||||
private static CountDownLatch getActionStarted() {
|
||||
return actionStarted;
|
||||
}
|
||||
|
||||
private static CountDownLatch getActionInterrupted() {
|
||||
return actionInterrupted;
|
||||
}
|
||||
|
||||
private static CountDownLatch getActionCompleted() {
|
||||
return actionCompleted;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupTest() throws Exception {
|
||||
waitForSeconds = 1 + random().nextInt(3);
|
||||
|
@ -79,6 +102,9 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
actionInitCalled = new CountDownLatch(1);
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
triggerFired = new AtomicBoolean(false);
|
||||
actionStarted = new CountDownLatch(1);
|
||||
actionInterrupted = new CountDownLatch(1);
|
||||
actionCompleted = new CountDownLatch(1);
|
||||
eventRef = new AtomicReference<>();
|
||||
// clear any persisted auto scaling configuration
|
||||
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
|
||||
|
@ -96,7 +122,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
@Test
|
||||
public void testTriggerThrottling() throws Exception {
|
||||
// for this test we want to create two triggers so we must assert that the actions were created twice
|
||||
TriggerIntegrationTest.actionInitCalled = new CountDownLatch(2);
|
||||
actionInitCalled = new CountDownLatch(2);
|
||||
// similarly we want both triggers to fire
|
||||
triggerFiredLatch = new CountDownLatch(2);
|
||||
|
||||
|
@ -109,7 +135,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '0s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
|
||||
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
|
@ -122,7 +148,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '0s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
|
||||
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
|
@ -135,7 +161,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
|
||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||
|
||||
if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) {
|
||||
if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
|
||||
fail("Both triggers should have fired by now");
|
||||
}
|
||||
|
||||
|
@ -150,7 +176,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
"'event' : 'nodeLost'," +
|
||||
"'waitFor' : '0s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
|
||||
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
|
@ -162,7 +188,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
"'event' : 'nodeLost'," +
|
||||
"'waitFor' : '0s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
|
||||
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
|
@ -183,14 +209,14 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) {
|
||||
if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
|
||||
fail("Both triggers should have fired by now");
|
||||
}
|
||||
}
|
||||
|
||||
static AtomicLong lastActionExecutedAt = new AtomicLong(0);
|
||||
static ReentrantLock lock = new ReentrantLock();
|
||||
public static class ThrottingTesterAction extends TestTriggerAction {
|
||||
public static class ThrottlingTesterAction extends TestTriggerAction {
|
||||
// nanos are very precise so we need a delta for comparison with ms
|
||||
private static final long DELTA_MS = 2;
|
||||
|
||||
|
@ -198,7 +224,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void process(AutoScaling.TriggerEvent event) {
|
||||
public void process(TriggerEvent event) {
|
||||
boolean locked = lock.tryLock();
|
||||
if (!locked) {
|
||||
log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
|
||||
|
@ -206,18 +232,18 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
}
|
||||
try {
|
||||
if (lastActionExecutedAt.get() != 0) {
|
||||
log.info("last action at " + lastActionExecutedAt.get() + " nano time = " + System.nanoTime());
|
||||
if (System.nanoTime() - lastActionExecutedAt.get() < TimeUnit.NANOSECONDS.convert(ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS, TimeUnit.MILLISECONDS)) {
|
||||
log.info("action executed again before minimum wait time from {}", event.getSource().getName());
|
||||
log.info("last action at " + lastActionExecutedAt.get() + " time = " + timeSource.getTime());
|
||||
if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - lastActionExecutedAt.get(), TimeUnit.NANOSECONDS) < ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS) {
|
||||
log.info("action executed again before minimum wait time from {}", event.getSource());
|
||||
fail("TriggerListener was fired before the throttling period");
|
||||
}
|
||||
}
|
||||
if (onlyOnce.compareAndSet(false, true)) {
|
||||
log.info("action executed from {}", event.getSource().getName());
|
||||
lastActionExecutedAt.set(System.nanoTime());
|
||||
triggerFiredLatch.countDown();
|
||||
log.info("action executed from {}", event.getSource());
|
||||
lastActionExecutedAt.set(timeSource.getTime());
|
||||
getTriggerFiredLatch().countDown();
|
||||
} else {
|
||||
log.info("action executed more than once from {}", event.getSource().getName());
|
||||
log.info("action executed more than once from {}", event.getSource());
|
||||
fail("Trigger should not have fired more than once!");
|
||||
}
|
||||
} finally {
|
||||
|
@ -293,7 +319,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
|
||||
assertNotNull(nodeLostEvent);
|
||||
assertEquals("The node added trigger was fired but for a different node",
|
||||
nodeName, nodeLostEvent.getNodeName());
|
||||
nodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -351,7 +377,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
|
||||
assertNotNull(nodeAddedEvent);
|
||||
assertEquals("The node added trigger was fired but for a different node",
|
||||
newNode.getNodeName(), nodeAddedEvent.getNodeName());
|
||||
newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -380,7 +406,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
|
||||
assertNotNull(nodeAddedEvent);
|
||||
assertEquals("The node added trigger was fired but for a different node",
|
||||
newNode.getNodeName(), nodeAddedEvent.getNodeName());
|
||||
newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
|
||||
|
||||
// reset
|
||||
actionConstructorCalled = new CountDownLatch(1);
|
||||
|
@ -443,7 +469,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
|
||||
assertNotNull(nodeLostEvent);
|
||||
assertEquals("The node lost trigger was fired but for a different node",
|
||||
lostNodeName, nodeLostEvent.getNodeName());
|
||||
lostNodeName, nodeLostEvent.getProperty(TriggerEvent.NODE_NAME));
|
||||
|
||||
// reset
|
||||
actionConstructorCalled = new CountDownLatch(1);
|
||||
|
@ -507,7 +533,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
|
||||
// stop the overseer, somebody else will take over as the overseer
|
||||
cluster.stopJettySolrRunner(index);
|
||||
|
||||
Thread.sleep(10000);
|
||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
|
@ -515,7 +541,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
|
||||
assertNotNull(nodeAddedEvent);
|
||||
assertEquals("The node added trigger was fired but for a different node",
|
||||
newNode.getNodeName(), nodeAddedEvent.getNodeName());
|
||||
newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
|
||||
}
|
||||
|
||||
public static class TestTriggerAction implements TriggerAction {
|
||||
|
@ -535,15 +561,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void process(AutoScaling.TriggerEvent event) {
|
||||
if (triggerFired.compareAndSet(false, true)) {
|
||||
eventRef.set(event);
|
||||
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
fail("NodeAddedListener was fired before the configured waitFor period");
|
||||
public void process(TriggerEvent event) {
|
||||
try {
|
||||
if (triggerFired.compareAndSet(false, true)) {
|
||||
eventRef.set(event);
|
||||
if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - event.getEventTime(), TimeUnit.NANOSECONDS) <= TimeUnit.MILLISECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
fail("NodeAddedListener was fired before the configured waitFor period");
|
||||
}
|
||||
getTriggerFiredLatch().countDown();
|
||||
} else {
|
||||
fail("NodeAddedTrigger was fired more than once!");
|
||||
}
|
||||
triggerFiredLatch.countDown();
|
||||
} else {
|
||||
fail("NodeAddedTrigger was fired more than once!");
|
||||
} catch (Throwable t) {
|
||||
log.debug("--throwable", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -558,4 +589,152 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
actionInitCalled.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestEventQueueAction implements TriggerAction {
|
||||
|
||||
public TestEventQueueAction() {
|
||||
log.info("TestEventQueueAction instantiated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return this.getClass().getSimpleName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClassName() {
|
||||
return this.getClass().getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TriggerEvent event) {
|
||||
eventRef.set(event);
|
||||
getActionStarted().countDown();
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
triggerFired.compareAndSet(false, true);
|
||||
getActionCompleted().countDown();
|
||||
} catch (InterruptedException e) {
|
||||
getActionInterrupted().countDown();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Map<String, String> args) {
|
||||
log.debug("TestTriggerAction init");
|
||||
actionInitCalled.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventQueue() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_trigger1'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||
int overseerLeaderIndex = 0;
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||
overseerLeaderIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
||||
fail("The TriggerAction should have been created by now");
|
||||
}
|
||||
|
||||
// add node to generate the event
|
||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||
boolean await = actionStarted.await(60, TimeUnit.SECONDS);
|
||||
assertTrue("action did not start", await);
|
||||
// event should be there
|
||||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
|
||||
assertNotNull(nodeAddedEvent);
|
||||
// but action did not complete yet so the event is still enqueued
|
||||
assertFalse(triggerFired.get());
|
||||
actionStarted = new CountDownLatch(1);
|
||||
// kill overseer leader
|
||||
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||
Thread.sleep(5000);
|
||||
await = actionInterrupted.await(3, TimeUnit.SECONDS);
|
||||
assertTrue("action wasn't interrupted", await);
|
||||
// new overseer leader should be elected and run triggers
|
||||
newNode = cluster.startJettySolrRunner();
|
||||
// it should fire again but not complete yet
|
||||
await = actionStarted.await(60, TimeUnit.SECONDS);
|
||||
TriggerEvent replayedEvent = eventRef.get();
|
||||
assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
|
||||
assertTrue(replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
|
||||
await = actionCompleted.await(10, TimeUnit.SECONDS);
|
||||
assertTrue(triggerFired.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventFromRestoredState() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_trigger'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '10s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
if (!actionInitCalled.await(10, TimeUnit.SECONDS)) {
|
||||
fail("The TriggerAction should have been created by now");
|
||||
}
|
||||
|
||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||
int overseerLeaderIndex = 0;
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||
overseerLeaderIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
assertTrue(triggerFired.get());
|
||||
// reset
|
||||
triggerFired.set(false);
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
|
||||
assertNotNull(nodeAddedEvent);
|
||||
assertEquals("The node added trigger was fired but for a different node",
|
||||
newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
|
||||
// add a second node - state of the trigger will change but it won't fire for waitFor sec.
|
||||
JettySolrRunner newNode2 = cluster.startJettySolrRunner();
|
||||
Thread.sleep(10000);
|
||||
// kill overseer leader
|
||||
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
assertTrue(triggerFired.get());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,6 +92,8 @@ public class ZkStateReader implements Closeable {
|
|||
public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
|
||||
public static final String SOLR_SECURITY_CONF_PATH = "/security.json";
|
||||
public static final String SOLR_AUTOSCALING_CONF_PATH = "/autoscaling.json";
|
||||
public static final String SOLR_AUTOSCALING_EVENTS_PATH = "/autoscaling/events";
|
||||
public static final String SOLR_AUTOSCALING_TRIGGER_STATE_PATH = "/autoscaling/triggerState";
|
||||
|
||||
public static final String REPLICATION_FACTOR = "replicationFactor";
|
||||
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
|
||||
|
|
|
@ -31,6 +31,8 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -53,33 +55,59 @@ public class Utils {
|
|||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
public static Map getDeepCopy(Map map, int maxDepth) {
|
||||
return getDeepCopy(map, maxDepth, true);
|
||||
return getDeepCopy(map, maxDepth, true, false);
|
||||
}
|
||||
|
||||
public static Map getDeepCopy(Map map, int maxDepth, boolean mutable) {
|
||||
return getDeepCopy(map, maxDepth, mutable, false);
|
||||
}
|
||||
|
||||
public static Map getDeepCopy(Map map, int maxDepth, boolean mutable, boolean sorted) {
|
||||
if(map == null) return null;
|
||||
if (maxDepth < 1) return map;
|
||||
Map copy = new LinkedHashMap();
|
||||
Map copy;
|
||||
if (sorted) {
|
||||
copy = new TreeMap();
|
||||
} else {
|
||||
copy = new LinkedHashMap();
|
||||
}
|
||||
for (Object o : map.entrySet()) {
|
||||
Map.Entry e = (Map.Entry) o;
|
||||
copy.put(e.getKey(), makeDeepCopy(e.getValue(),maxDepth, mutable));
|
||||
copy.put(e.getKey(), makeDeepCopy(e.getValue(),maxDepth, mutable, sorted));
|
||||
}
|
||||
return mutable ? copy : Collections.unmodifiableMap(copy);
|
||||
}
|
||||
|
||||
private static Object makeDeepCopy(Object v, int maxDepth, boolean mutable) {
|
||||
if (v instanceof MapWriter && maxDepth > 1) v = ((MapWriter) v).toMap(new LinkedHashMap<>());
|
||||
else if (v instanceof IteratorWriter && maxDepth > 1) v = ((IteratorWriter) v).toList(new ArrayList<>());
|
||||
private static Object makeDeepCopy(Object v, int maxDepth, boolean mutable, boolean sorted) {
|
||||
if (v instanceof MapWriter && maxDepth > 1) {
|
||||
v = ((MapWriter) v).toMap(new LinkedHashMap<>());
|
||||
} else if (v instanceof IteratorWriter && maxDepth > 1) {
|
||||
v = ((IteratorWriter) v).toList(new ArrayList<>());
|
||||
if (sorted) {
|
||||
Collections.sort((List)v);
|
||||
}
|
||||
}
|
||||
|
||||
if (v instanceof Map) v = getDeepCopy((Map) v, maxDepth - 1, mutable);
|
||||
else if (v instanceof Collection) v = getDeepCopy((Collection) v, maxDepth - 1, mutable);
|
||||
if (v instanceof Map) {
|
||||
v = getDeepCopy((Map) v, maxDepth - 1, mutable, sorted);
|
||||
} else if (v instanceof Collection) {
|
||||
v = getDeepCopy((Collection) v, maxDepth - 1, mutable, sorted);
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
public static Collection getDeepCopy(Collection c, int maxDepth, boolean mutable) {
|
||||
return getDeepCopy(c, maxDepth, mutable, false);
|
||||
}
|
||||
|
||||
public static Collection getDeepCopy(Collection c, int maxDepth, boolean mutable, boolean sorted) {
|
||||
if (c == null || maxDepth < 1) return c;
|
||||
Collection result = c instanceof Set ? new HashSet() : new ArrayList();
|
||||
for (Object o : c) result.add(makeDeepCopy(o, maxDepth, mutable));
|
||||
Collection result = c instanceof Set ?
|
||||
( sorted? new TreeSet() : new HashSet()) : new ArrayList();
|
||||
for (Object o : c) result.add(makeDeepCopy(o, maxDepth, mutable, sorted));
|
||||
if (sorted && (result instanceof List)) {
|
||||
Collections.sort((List)result);
|
||||
}
|
||||
return mutable ? result : result instanceof Set ? unmodifiableSet((Set) result) : unmodifiableList((List) result);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue