SOLR-10376: Implement autoscaling trigger for nodeAdded event

This commit is contained in:
Shalin Shekhar Mangar 2017-04-17 12:18:32 +05:30
parent f31546f6e6
commit 25ef04b714
13 changed files with 1030 additions and 12 deletions

View File

@ -71,6 +71,8 @@ New Features
* SOLR-10393: Adds UUID Streaming Evaluator (Dennis Gove)
* SOLR-10376: Implement autoscaling trigger for nodeAdded event. (shalin)
Bug Fixes
----------------------
* SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509.

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.codahale.metrics.Timer;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.CollectionMutator;
import org.apache.solr.cloud.overseer.NodeMutator;
@ -464,6 +465,8 @@ public class Overseer implements Closeable {
private OverseerThread arfoThread;
private OverseerThread triggerThread;
private final ZkStateReader reader;
private final ShardHandler shardHandler;
@ -519,10 +522,15 @@ public class Overseer implements Closeable {
OverseerAutoReplicaFailoverThread autoReplicaFailoverThread = new OverseerAutoReplicaFailoverThread(config, reader, updateShardHandler);
arfoThread = new OverseerThread(ohcfTg, autoReplicaFailoverThread, "OverseerHdfsCoreFailoverThread-" + id);
arfoThread.setDaemon(true);
ThreadGroup triggerThreadGroup = new ThreadGroup("Overseer autoscaling triggers");
OverseerTriggerThread trigger = new OverseerTriggerThread(zkController);
triggerThread = new OverseerThread(triggerThreadGroup, trigger, "OverseerAutoScalingTriggerThread-" + id);
updaterThread.start();
ccThread.start();
arfoThread.start();
triggerThread.start();
assert ObjectReleaseTracker.track(this);
}
@ -567,6 +575,10 @@ public class Overseer implements Closeable {
IOUtils.closeQuietly(arfoThread);
arfoThread.interrupt();
}
if (triggerThread != null) {
IOUtils.closeQuietly(triggerThread);
triggerThread.interrupt();
}
if (updaterThread != null) {
try {
@ -583,10 +595,16 @@ public class Overseer implements Closeable {
arfoThread.join();
} catch (InterruptedException e) {}
}
if (triggerThread != null) {
try {
triggerThread.join();
} catch (InterruptedException e) {}
}
updaterThread = null;
ccThread = null;
arfoThread = null;
triggerThread = null;
}
/**

View File

@ -1818,7 +1818,7 @@ public class ZkController {
}
}
CoreContainer getCoreContainer() {
public CoreContainer getCoreContainer() {
return cc;
}

View File

@ -17,9 +17,14 @@
package org.apache.solr.cloud.autoscaling;
import java.util.Date;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.core.CoreContainer;
public class AutoScaling {
public enum EventType {
@ -33,6 +38,7 @@ public class AutoScaling {
}
public enum TriggerStage {
WAITING,
STARTED,
ABORTED,
SUCCEEDED,
@ -41,18 +47,43 @@ public class AutoScaling {
AFTER_ACTION
}
public static interface TriggerListener {
public void triggerFired(Trigger trigger, Event event);
public static interface TriggerEvent<T extends Trigger> {
public T getSource();
public long getEventNanoTime();
public void setContext(Map<String, Object> context);
public Map<String, Object> getContext();
}
public static interface TriggerListener<E extends TriggerEvent<? extends Trigger>> {
public void triggerFired(E event);
}
public static class HttpCallbackListener implements TriggerListener {
@Override
public void triggerFired(Trigger trigger, Event event) {
public void triggerFired(TriggerEvent event) {
}
}
public static interface Trigger {
/**
* Interface for a Solr trigger. Each trigger implements Runnable and Closeable interface. A trigger
* is scheduled using a {@link java.util.concurrent.ScheduledExecutorService} so it is executed as
* per a configured schedule to check whether the trigger is ready to fire. The {@link #setListener(TriggerListener)}
* method should be used to set a callback listener which is fired by implementation of this class whenever
* ready.
* <p>
* As per the guarantees made by the {@link java.util.concurrent.ScheduledExecutorService} a trigger
* implementation is only ever called sequentially and therefore need not be thread safe. However, it
* is encouraged that implementations be immutable with the exception of the associated listener
* which can be get/set by a different thread than the one executing the trigger. Therefore, implementations
* should use appropriate synchronization around the listener.
*
* @param <E> the {@link TriggerEvent} which is handled by this Trigger
*/
public static interface Trigger<E extends TriggerEvent<? extends Trigger>> extends Closeable, Runnable {
public String getName();
public EventType getEventType();
@ -60,14 +91,45 @@ public class AutoScaling {
public boolean isEnabled();
public Map<String, Object> getProperties();
public int getWaitForSecond();
public List<TriggerAction> getActions();
public void setListener(TriggerListener<E> listener);
public TriggerListener<E> getListener();
public boolean isClosed();
}
public static interface Event {
public String getSource();
public static class TriggerFactory implements Closeable {
public Date getTime();
private final CoreContainer coreContainer;
public EventType getType();
private boolean isClosed = false;
public TriggerFactory(CoreContainer coreContainer) {
this.coreContainer = coreContainer;
}
public synchronized Trigger create(EventType type, String name, Map<String, Object> props) {
if (isClosed) {
throw new AlreadyClosedException("TriggerFactory has already been closed, cannot create new triggers");
}
switch (type) {
case NODEADDED:
return new NodeAddedTrigger(name, props, coreContainer);
default:
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
}
}
@Override
public void close() throws IOException {
synchronized (this) {
isClosed = true;
}
}
}
}

View File

@ -33,4 +33,19 @@ public class ComputePlanAction implements TriggerAction {
public void init(Map<String, String> args) {
}
@Override
public String getName() {
return null;
}
@Override
public String getClassName() {
return null;
}
@Override
public void process(AutoScaling.TriggerEvent event) {
}
}

View File

@ -33,4 +33,19 @@ public class ExecutePlanAction implements TriggerAction {
public void init(Map<String, String> args) {
}
@Override
public String getName() {
return null;
}
@Override
public String getClassName() {
return null;
}
@Override
public void process(AutoScaling.TriggerEvent event) {
}
}

View File

@ -33,4 +33,19 @@ public class LogPlanAction implements TriggerAction {
public void init(Map<String, String> args) {
}
@Override
public String getName() {
return null;
}
@Override
public String getClassName() {
return null;
}
@Override
public void process(AutoScaling.TriggerEvent event) {
}
}

View File

@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Trigger for the {@link org.apache.solr.cloud.autoscaling.AutoScaling.EventType#NODEADDED} event
*/
public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.NodeAddedEvent> {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String name;
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
private final AtomicReference<AutoScaling.TriggerListener<NodeAddedEvent>> listenerRef;
private boolean isClosed = false;
private Set<String> lastLiveNodes;
private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
public NodeAddedTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
this.name = name;
this.properties = properties;
this.container = container;
this.listenerRef = new AtomicReference<>();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
actions = new ArrayList<>(3);
for (Map<String, String> map : o) {
TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class);
action.init(map);
actions.add(action);
}
} else {
actions = Collections.emptyList();
}
}
@Override
public void setListener(AutoScaling.TriggerListener<NodeAddedEvent> listener) {
listenerRef.set(listener);
}
@Override
public AutoScaling.TriggerListener<NodeAddedEvent> getListener() {
return listenerRef.get();
}
@Override
public String getName() {
return name;
}
@Override
public AutoScaling.EventType getEventType() {
return AutoScaling.EventType.valueOf((String) properties.get("event"));
}
@Override
public boolean isEnabled() {
return Boolean.parseBoolean((String) properties.getOrDefault("enabled", "true"));
}
@Override
public int getWaitForSecond() {
return ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
}
@Override
public Map<String, Object> getProperties() {
return properties;
}
@Override
public List<TriggerAction> getActions() {
return actions;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof NodeAddedTrigger) {
NodeAddedTrigger that = (NodeAddedTrigger) obj;
return this.name.equals(that.name)
&& this.properties.equals(that.properties);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(name, properties);
}
@Override
public void close() throws IOException {
synchronized (this) {
isClosed = true;
}
}
@Override
public void run() {
try {
synchronized (this) {
if (isClosed) {
log.warn("NodeAddedTrigger ran but was already closed");
throw new RuntimeException("Trigger has been closed");
}
}
log.debug("Running NodeAddedTrigger");
ZkStateReader reader = container.getZkController().getZkStateReader();
Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
log.info("Found livenodes: " + newLiveNodes);
if (lastLiveNodes == null) {
lastLiveNodes = newLiveNodes;
return;
}
// have any nodes that we were tracking been removed from the cluster?
// if so, remove them from the tracking map
Set<String> trackingKeySet = nodeNameVsTimeAdded.keySet();
trackingKeySet.retainAll(newLiveNodes);
// have any new nodes been added?
Set<String> copyOfNew = new HashSet<>(newLiveNodes);
copyOfNew.removeAll(lastLiveNodes);
copyOfNew.forEach(n -> {
log.info("Tracking new node: {}", n);
nodeNameVsTimeAdded.put(n, System.nanoTime());
});
// has enough time expired to trigger events for a node?
for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
String nodeName = entry.getKey();
Long timeAdded = entry.getValue();
if (TimeUnit.SECONDS.convert(System.nanoTime() - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
// fire!
AutoScaling.TriggerListener<NodeAddedEvent> listener = listenerRef.get();
if (listener != null) {
log.info("NodeAddedTrigger firing registered listener");
listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName));
}
trackingKeySet.remove(nodeName);
}
}
lastLiveNodes = newLiveNodes;
} catch (RuntimeException e) {
log.error("Unexpected exception in NodeAddedTrigger", e);
}
}
@Override
public boolean isClosed() {
synchronized (this) {
return isClosed;
}
}
public static class NodeAddedEvent implements AutoScaling.TriggerEvent<NodeAddedTrigger> {
private final NodeAddedTrigger source;
private final long nodeAddedNanoTime;
private final String nodeName;
private Map<String, Object> context;
public NodeAddedEvent(NodeAddedTrigger source, long nodeAddedNanoTime, String nodeAdded) {
this.source = source;
this.nodeAddedNanoTime = nodeAddedNanoTime;
this.nodeName = nodeAdded;
}
@Override
public NodeAddedTrigger getSource() {
return source;
}
@Override
public long getEventNanoTime() {
return nodeAddedNanoTime;
}
public String getNodeName() {
return nodeName;
}
public AutoScaling.EventType getType() {
return source.getEventType();
}
@Override
public void setContext(Map<String, Object> context) {
this.context = context;
}
@Override
public Map<String, Object> getContext() {
return context;
}
}
}

View File

@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.util.IOUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Overseer thread responsible for reading triggers from zookeeper and
* adding/removing them from {@link ScheduledTriggers}
*/
public class OverseerTriggerThread implements Runnable, Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ZkController zkController;
private final ZkStateReader zkStateReader;
private final ScheduledTriggers scheduledTriggers;
private final AutoScaling.TriggerFactory triggerFactory;
private final ReentrantLock updateLock = new ReentrantLock();
private final Condition updated = updateLock.newCondition();
/*
Following variables are only accessed or modified when updateLock is held
*/
private int znodeVersion = -1;
private Map<String, AutoScaling.Trigger> activeTriggers = new HashMap<>();
private boolean isClosed = false;
public OverseerTriggerThread(ZkController zkController) {
this.zkController = zkController;
zkStateReader = zkController.getZkStateReader();
scheduledTriggers = new ScheduledTriggers();
triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer());
}
@Override
public void close() throws IOException {
updateLock.lock();
try {
isClosed = true;
activeTriggers.clear();
updated.signalAll();
} finally {
updateLock.unlock();
}
IOUtils.closeQuietly(triggerFactory);
IOUtils.closeQuietly(scheduledTriggers);
}
@Override
public void run() {
int lastZnodeVersion = znodeVersion;
SolrZkClient zkClient = zkStateReader.getZkClient();
createWatcher(zkClient);
while (true) {
Map<String, AutoScaling.Trigger> copy = null;
try {
updateLock.lockInterruptibly();
if (znodeVersion == lastZnodeVersion) {
updated.await();
// are we closed?
if (isClosed) break;
// spurious wakeup?
if (znodeVersion == lastZnodeVersion) continue;
lastZnodeVersion = znodeVersion;
}
copy = new HashMap<>(activeTriggers);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("Interrupted", e);
break;
} finally {
updateLock.unlock();
}
Set<String> managedTriggerNames = scheduledTriggers.getScheduledTriggerNames();
// remove the triggers which are no longer active
for (String managedTriggerName : managedTriggerNames) {
if (!copy.containsKey(managedTriggerName)) {
scheduledTriggers.remove(managedTriggerName);
}
}
// add new triggers and/or replace and close the replaced triggers
for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
scheduledTriggers.add(entry.getValue());
}
}
}
private void createWatcher(SolrZkClient zkClient) {
try {
zkClient.exists(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// session events are not change events, and do not remove the watcher
if (Event.EventType.None.equals(watchedEvent.getType())) {
return;
}
updateLock.lock();
if (isClosed) {
return;
}
try {
final Stat stat = new Stat();
final byte[] data = zkClient.getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, this, stat, true);
if (znodeVersion >= stat.getVersion()) {
// protect against reordered watcher fires by ensuring that we only move forward
return;
}
znodeVersion = stat.getVersion();
Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, data);
for (Map.Entry<String, AutoScaling.Trigger> entry : triggerMap.entrySet()) {
String triggerName = entry.getKey();
AutoScaling.Trigger trigger = entry.getValue();
if (trigger.isEnabled()) {
activeTriggers.put(triggerName, trigger);
} else {
activeTriggers.remove(triggerName);
}
}
updated.signalAll();
} catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
} catch (KeeperException e) {
log.error("A ZK error has occurred", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("Interrupted", e);
} catch (Exception e) {
log.error("Unexpected exception", e);
} finally {
updateLock.unlock();
}
}
}, true);
} catch (KeeperException e) {
log.error("Exception in OverseerTriggerThread", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("OverseerTriggerThread interrupted", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
private static Map<String, AutoScaling.Trigger> loadTriggers(AutoScaling.TriggerFactory triggerFactory, byte[] data) {
ZkNodeProps loaded = ZkNodeProps.load(data);
Map<String, Object> triggers = (Map<String, Object>) loaded.get("triggers");
Map<String, AutoScaling.Trigger> triggerMap = new HashMap<>(triggers.size());
for (Map.Entry<String, Object> entry : triggers.entrySet()) {
Map<String, Object> props = (Map<String, Object>) entry.getValue();
String event = (String) props.get("event");
AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
String triggerName = entry.getKey();
triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, props));
}
return triggerMap;
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Responsible for scheduling active triggers, starting and stopping them and
* performing actions when they fire
*/
public class ScheduledTriggers implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, ScheduledTrigger> scheduledTriggers = new HashMap<>();
/**
* Thread pool for scheduling the triggers
*/
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
/**
* Single threaded executor to run the actions upon a trigger event
*/
private final ExecutorService actionExecutor;
private boolean isClosed = false;
public ScheduledTriggers() {
// todo make the core pool size configurable
// it is important to use more than one because a taking time trigger can starve other scheduled triggers
// ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand
// how many triggers we have and secondly, that many threads will always be instantiated and kept around idle
// so it is wasteful as well. Hopefully 4 is a good compromise.
scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(4,
new DefaultSolrThreadFactory("ScheduledTrigger-"));
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
}
/**
* Adds a new trigger or replaces an existing one. The replaced trigger, if any, is closed
* <b>before</b> the new trigger is run. If a trigger is replaced with itself then this
* operation becomes a no-op.
*
* @param newTrigger the trigger to be managed
* @throws AlreadyClosedException if this class has already been closed
*/
public synchronized void add(AutoScaling.Trigger newTrigger) {
if (isClosed) {
throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
}
ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger);
ScheduledTrigger old = scheduledTriggers.putIfAbsent(newTrigger.getName(), scheduledTrigger);
if (old != null) {
if (old.trigger.equals(newTrigger)) {
// the trigger wasn't actually modified so we do nothing
return;
}
IOUtils.closeQuietly(old);
scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
}
newTrigger.setListener(event -> {
AutoScaling.Trigger source = event.getSource();
if (source.isClosed()) {
log.warn("Ignoring autoscaling event because the source trigger: " + source + " has already been closed");
return;
}
List<TriggerAction> actions = source.getActions();
if (actions != null) {
actionExecutor.submit(() -> {
for (TriggerAction action : actions) {
try {
action.process(event);
} catch (Exception e) {
log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
throw e;
}
}
});
}
});
scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(newTrigger, 0, 1, TimeUnit.SECONDS);
}
/**
* Removes and stops the trigger with the given name
*
* @param triggerName the name of the trigger to be removed
* @throws AlreadyClosedException if this class has already been closed
*/
public synchronized void remove(String triggerName) {
if (isClosed) {
throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used any more");
}
ScheduledTrigger removed = scheduledTriggers.remove(triggerName);
IOUtils.closeQuietly(removed);
}
/**
* @return an unmodifiable set of names of all triggers being managed by this class
*/
public synchronized Set<String> getScheduledTriggerNames() {
return Collections.unmodifiableSet(scheduledTriggers.keySet());
}
@Override
public void close() throws IOException {
synchronized (this) {
// mark that we are closed
isClosed = true;
for (ScheduledTrigger scheduledTrigger : scheduledTriggers.values()) {
IOUtils.closeQuietly(scheduledTrigger);
}
scheduledTriggers.clear();
}
ExecutorUtil.shutdownAndAwaitTermination(scheduledThreadPoolExecutor);
ExecutorUtil.shutdownAndAwaitTermination(actionExecutor);
}
private static class ScheduledTrigger implements Closeable {
AutoScaling.Trigger trigger;
ScheduledFuture<?> scheduledFuture;
ScheduledTrigger(AutoScaling.Trigger trigger) {
this.trigger = trigger;
}
@Override
public void close() throws IOException {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
IOUtils.closeQuietly(trigger);
}
}
}

View File

@ -26,4 +26,9 @@ import org.apache.solr.util.plugin.MapInitializedPlugin;
*/
public interface TriggerAction extends MapInitializedPlugin, Closeable {
// todo nocommit
public String getName();
public String getClassName();
public void process(AutoScaling.TriggerEvent event);
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.core.CoreContainer;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test for {@link NodeAddedTrigger}
*/
public class NodeAddedTriggerTest extends SolrCloudTestCase {
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(1)
.addConfig("conf", configset("cloud-minimal"))
.configure();
}
@Test
public void test() throws Exception {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
Map<String, Object> props = new HashMap<>();
props.put("event", "nodeLost");
long waitForSeconds = 1 + random().nextInt(5);
props.put("waitFor", waitForSeconds);
props.put("enabled", "true");
List<Map<String, String>> actions = new ArrayList<>(3);
Map<String, String> map = new HashMap<>(2);
map.put("name", "compute_plan");
map.put("class", "solr.ComputePlanAction");
actions.add(map);
map = new HashMap<>(2);
map.put("name", "execute_plan");
map.put("class", "solr.ExecutePlanAction");
actions.add(map);
map = new HashMap<>(2);
map.put("name", "log_plan");
map.put("class", "solr.LogPlanAction");
actions.add(map);
props.put("actions", actions);
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
trigger.run();
JettySolrRunner newNode = cluster.startJettySolrRunner();
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<NodeAddedTrigger.NodeAddedEvent> eventRef = new AtomicReference<>();
trigger.setListener(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
fail("NodeAddedListener was fired before the configured waitFor period");
}
} else {
fail("NodeAddedTrigger was fired more than once!");
}
});
int counter = 0;
do {
trigger.run();
Thread.sleep(1000);
if (counter++ > 10) {
fail("Newly added node was not discovered by trigger even after 10 seconds");
}
} while (!fired.get());
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
assertNotNull(nodeAddedEvent);
assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName());
}
// add a new node but remove it before the waitFor period expires
// and assert that the trigger doesn't fire at all
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
final int waitTime = 2;
props.put("waitFor", waitTime);
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
trigger.run();
JettySolrRunner newNode = cluster.startJettySolrRunner();
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setListener(event -> {
if (fired.compareAndSet(false, true)) {
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
fail("NodeAddedListener was fired before the configured waitFor period");
}
} else {
fail("NodeAddedTrigger was fired more than once!");
}
});
trigger.run(); // first run should detect the new node
newNode.stop(); // stop the new jetty
int counter = 0;
do {
trigger.run();
Thread.sleep(1000);
if (counter++ > waitTime + 1) { // run it a little more than the wait time
break;
}
} while (true);
// ensure the event was not fired
assertFalse(fired.get());
}
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.util.NamedList;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An end-to-end integration test for triggers
*/
public class TriggerIntegrationTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static CountDownLatch actionCreated = new CountDownLatch(1);
private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
private static int waitForSeconds = 1;
private static AtomicBoolean triggerFired = new AtomicBoolean(false);
private static AtomicReference<NodeAddedTrigger.NodeAddedEvent> eventRef = new AtomicReference<>();
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(1)
.addConfig("conf", configset("cloud-minimal"))
.configure();
waitForSeconds = 1 + random().nextInt(3);
}
@Test
public void testNodeAddedTrigger() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
// todo nocommit -- add testing for the v2 path
// String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling";
String path = "/admin/autoscaling";
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger'," +
"'event' : 'nodeAdded'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : 'true'," +
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
"}}";
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
fail("The TriggerAction should have been created by now");
}
JettySolrRunner newNode = cluster.startJettySolrRunner();
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
newNode.getNodeName(), nodeAddedEvent.getNodeName());
}
public static class TestTriggerAction implements TriggerAction {
public TestTriggerAction() {
log.info("TestTriggerAction instantiated");
actionCreated.countDown();
}
@Override
public String getName() {
return "TestTriggerAction";
}
@Override
public String getClassName() {
return this.getClass().getName();
}
@Override
public void process(AutoScaling.TriggerEvent event) {
if (triggerFired.compareAndSet(false, true)) {
eventRef.set((NodeAddedTrigger.NodeAddedEvent) event);
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
fail("NodeAddedListener was fired before the configured waitFor period");
}
triggerFiredLatch.countDown();
} else {
fail("NodeAddedTrigger was fired more than once!");
}
}
@Override
public void close() throws IOException {
}
@Override
public void init(Map<String, String> args) {
}
}
}