From 25ef04b714b22267de51d6c61abc771a4b970d20 Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Mon, 17 Apr 2017 12:18:32 +0530 Subject: [PATCH] SOLR-10376: Implement autoscaling trigger for nodeAdded event --- solr/CHANGES.txt | 2 + .../java/org/apache/solr/cloud/Overseer.java | 20 +- .../org/apache/solr/cloud/ZkController.java | 2 +- .../solr/cloud/autoscaling/AutoScaling.java | 82 +++++- .../cloud/autoscaling/ComputePlanAction.java | 15 ++ .../cloud/autoscaling/ExecutePlanAction.java | 15 ++ .../solr/cloud/autoscaling/LogPlanAction.java | 15 ++ .../cloud/autoscaling/NodeAddedTrigger.java | 238 ++++++++++++++++++ .../autoscaling/OverseerTriggerThread.java | 214 ++++++++++++++++ .../cloud/autoscaling/ScheduledTriggers.java | 171 +++++++++++++ .../solr/cloud/autoscaling/TriggerAction.java | 5 + .../autoscaling/NodeAddedTriggerTest.java | 133 ++++++++++ .../autoscaling/TriggerIntegrationTest.java | 130 ++++++++++ 13 files changed, 1030 insertions(+), 12 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java create mode 100644 solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java create mode 100644 solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 29f9c46a093..15f1151e98c 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -71,6 +71,8 @@ New Features * SOLR-10393: Adds UUID Streaming Evaluator (Dennis Gove) +* SOLR-10376: Implement autoscaling trigger for nodeAdded event. (shalin) + Bug Fixes ---------------------- * SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509. diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 4d3cee7d737..4fe0fdbaaa2 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.codahale.metrics.Timer; import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.cloud.autoscaling.OverseerTriggerThread; import org.apache.solr.cloud.overseer.ClusterStateMutator; import org.apache.solr.cloud.overseer.CollectionMutator; import org.apache.solr.cloud.overseer.NodeMutator; @@ -464,6 +465,8 @@ public class Overseer implements Closeable { private OverseerThread arfoThread; + private OverseerThread triggerThread; + private final ZkStateReader reader; private final ShardHandler shardHandler; @@ -519,10 +522,15 @@ public class Overseer implements Closeable { OverseerAutoReplicaFailoverThread autoReplicaFailoverThread = new OverseerAutoReplicaFailoverThread(config, reader, updateShardHandler); arfoThread = new OverseerThread(ohcfTg, autoReplicaFailoverThread, "OverseerHdfsCoreFailoverThread-" + id); arfoThread.setDaemon(true); - + + ThreadGroup triggerThreadGroup = new ThreadGroup("Overseer autoscaling triggers"); + OverseerTriggerThread trigger = new OverseerTriggerThread(zkController); + triggerThread = new OverseerThread(triggerThreadGroup, trigger, "OverseerAutoScalingTriggerThread-" + id); + updaterThread.start(); ccThread.start(); arfoThread.start(); + triggerThread.start(); assert ObjectReleaseTracker.track(this); } @@ -567,6 +575,10 @@ public class Overseer implements Closeable { IOUtils.closeQuietly(arfoThread); arfoThread.interrupt(); } + if (triggerThread != null) { + IOUtils.closeQuietly(triggerThread); + triggerThread.interrupt(); + } if (updaterThread != null) { try { @@ -583,10 +595,16 @@ public class Overseer implements Closeable { arfoThread.join(); } catch (InterruptedException e) {} } + if (triggerThread != null) { + try { + triggerThread.join(); + } catch (InterruptedException e) {} + } updaterThread = null; ccThread = null; arfoThread = null; + triggerThread = null; } /** diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 5f779b22ce2..9a2a76f449e 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1818,7 +1818,7 @@ public class ZkController { } } - CoreContainer getCoreContainer() { + public CoreContainer getCoreContainer() { return cc; } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java index 560ce663fe4..eb98130b4b1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java @@ -17,9 +17,14 @@ package org.apache.solr.cloud.autoscaling; -import java.util.Date; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; import java.util.Map; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.solr.core.CoreContainer; + public class AutoScaling { public enum EventType { @@ -33,6 +38,7 @@ public class AutoScaling { } public enum TriggerStage { + WAITING, STARTED, ABORTED, SUCCEEDED, @@ -41,18 +47,43 @@ public class AutoScaling { AFTER_ACTION } - public static interface TriggerListener { - public void triggerFired(Trigger trigger, Event event); + public static interface TriggerEvent { + public T getSource(); + + public long getEventNanoTime(); + + public void setContext(Map context); + + public Map getContext(); + } + + public static interface TriggerListener> { + public void triggerFired(E event); } public static class HttpCallbackListener implements TriggerListener { @Override - public void triggerFired(Trigger trigger, Event event) { + public void triggerFired(TriggerEvent event) { } } - public static interface Trigger { + /** + * Interface for a Solr trigger. Each trigger implements Runnable and Closeable interface. A trigger + * is scheduled using a {@link java.util.concurrent.ScheduledExecutorService} so it is executed as + * per a configured schedule to check whether the trigger is ready to fire. The {@link #setListener(TriggerListener)} + * method should be used to set a callback listener which is fired by implementation of this class whenever + * ready. + *

+ * As per the guarantees made by the {@link java.util.concurrent.ScheduledExecutorService} a trigger + * implementation is only ever called sequentially and therefore need not be thread safe. However, it + * is encouraged that implementations be immutable with the exception of the associated listener + * which can be get/set by a different thread than the one executing the trigger. Therefore, implementations + * should use appropriate synchronization around the listener. + * + * @param the {@link TriggerEvent} which is handled by this Trigger + */ + public static interface Trigger> extends Closeable, Runnable { public String getName(); public EventType getEventType(); @@ -60,14 +91,45 @@ public class AutoScaling { public boolean isEnabled(); public Map getProperties(); + + public int getWaitForSecond(); + + public List getActions(); + + public void setListener(TriggerListener listener); + + public TriggerListener getListener(); + + public boolean isClosed(); } - public static interface Event { - public String getSource(); + public static class TriggerFactory implements Closeable { - public Date getTime(); + private final CoreContainer coreContainer; - public EventType getType(); + private boolean isClosed = false; + + public TriggerFactory(CoreContainer coreContainer) { + this.coreContainer = coreContainer; + } + + public synchronized Trigger create(EventType type, String name, Map props) { + if (isClosed) { + throw new AlreadyClosedException("TriggerFactory has already been closed, cannot create new triggers"); + } + switch (type) { + case NODEADDED: + return new NodeAddedTrigger(name, props, coreContainer); + default: + throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name); + } + } + + @Override + public void close() throws IOException { + synchronized (this) { + isClosed = true; + } + } } - } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java index 79366d1cfbe..3e7aff60a1c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java @@ -33,4 +33,19 @@ public class ComputePlanAction implements TriggerAction { public void init(Map args) { } + + @Override + public String getName() { + return null; + } + + @Override + public String getClassName() { + return null; + } + + @Override + public void process(AutoScaling.TriggerEvent event) { + + } } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java index 454530f7f3e..59509481baa 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java @@ -33,4 +33,19 @@ public class ExecutePlanAction implements TriggerAction { public void init(Map args) { } + + @Override + public String getName() { + return null; + } + + @Override + public String getClassName() { + return null; + } + + @Override + public void process(AutoScaling.TriggerEvent event) { + + } } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java index 3a92eed7c3e..fc86c96da0d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java @@ -33,4 +33,19 @@ public class LogPlanAction implements TriggerAction { public void init(Map args) { } + + @Override + public String getName() { + return null; + } + + @Override + public String getClassName() { + return null; + } + + @Override + public void process(AutoScaling.TriggerEvent event) { + + } } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java new file mode 100644 index 00000000000..1131161d058 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.autoscaling; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.core.CoreContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Trigger for the {@link org.apache.solr.cloud.autoscaling.AutoScaling.EventType#NODEADDED} event + */ +public class NodeAddedTrigger implements AutoScaling.Trigger { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final String name; + private final Map properties; + private final CoreContainer container; + private final List actions; + private final AtomicReference> listenerRef; + + private boolean isClosed = false; + + private Set lastLiveNodes; + + private Map nodeNameVsTimeAdded = new HashMap<>(); + + public NodeAddedTrigger(String name, Map properties, + CoreContainer container) { + this.name = name; + this.properties = properties; + this.container = container; + this.listenerRef = new AtomicReference<>(); + List> o = (List>) properties.get("actions"); + if (o != null && !o.isEmpty()) { + actions = new ArrayList<>(3); + for (Map map : o) { + TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class); + action.init(map); + actions.add(action); + } + } else { + actions = Collections.emptyList(); + } + } + + @Override + public void setListener(AutoScaling.TriggerListener listener) { + listenerRef.set(listener); + } + + @Override + public AutoScaling.TriggerListener getListener() { + return listenerRef.get(); + } + + @Override + public String getName() { + return name; + } + + @Override + public AutoScaling.EventType getEventType() { + return AutoScaling.EventType.valueOf((String) properties.get("event")); + } + + @Override + public boolean isEnabled() { + return Boolean.parseBoolean((String) properties.getOrDefault("enabled", "true")); + } + + @Override + public int getWaitForSecond() { + return ((Long) properties.getOrDefault("waitFor", -1L)).intValue(); + } + + @Override + public Map getProperties() { + return properties; + } + + @Override + public List getActions() { + return actions; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof NodeAddedTrigger) { + NodeAddedTrigger that = (NodeAddedTrigger) obj; + return this.name.equals(that.name) + && this.properties.equals(that.properties); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(name, properties); + } + + @Override + public void close() throws IOException { + synchronized (this) { + isClosed = true; + } + } + + @Override + public void run() { + try { + synchronized (this) { + if (isClosed) { + log.warn("NodeAddedTrigger ran but was already closed"); + throw new RuntimeException("Trigger has been closed"); + } + } + log.debug("Running NodeAddedTrigger"); + + ZkStateReader reader = container.getZkController().getZkStateReader(); + Set newLiveNodes = reader.getClusterState().getLiveNodes(); + log.info("Found livenodes: " + newLiveNodes); + if (lastLiveNodes == null) { + lastLiveNodes = newLiveNodes; + return; + } + + // have any nodes that we were tracking been removed from the cluster? + // if so, remove them from the tracking map + Set trackingKeySet = nodeNameVsTimeAdded.keySet(); + trackingKeySet.retainAll(newLiveNodes); + + // have any new nodes been added? + Set copyOfNew = new HashSet<>(newLiveNodes); + copyOfNew.removeAll(lastLiveNodes); + copyOfNew.forEach(n -> { + log.info("Tracking new node: {}", n); + nodeNameVsTimeAdded.put(n, System.nanoTime()); + }); + + // has enough time expired to trigger events for a node? + for (Map.Entry entry : nodeNameVsTimeAdded.entrySet()) { + String nodeName = entry.getKey(); + Long timeAdded = entry.getValue(); + if (TimeUnit.SECONDS.convert(System.nanoTime() - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) { + // fire! + AutoScaling.TriggerListener listener = listenerRef.get(); + if (listener != null) { + log.info("NodeAddedTrigger firing registered listener"); + listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName)); + } + trackingKeySet.remove(nodeName); + } + } + + lastLiveNodes = newLiveNodes; + } catch (RuntimeException e) { + log.error("Unexpected exception in NodeAddedTrigger", e); + } + } + + @Override + public boolean isClosed() { + synchronized (this) { + return isClosed; + } + } + + public static class NodeAddedEvent implements AutoScaling.TriggerEvent { + private final NodeAddedTrigger source; + private final long nodeAddedNanoTime; + private final String nodeName; + + private Map context; + + public NodeAddedEvent(NodeAddedTrigger source, long nodeAddedNanoTime, String nodeAdded) { + this.source = source; + this.nodeAddedNanoTime = nodeAddedNanoTime; + this.nodeName = nodeAdded; + } + + @Override + public NodeAddedTrigger getSource() { + return source; + } + + @Override + public long getEventNanoTime() { + return nodeAddedNanoTime; + } + + public String getNodeName() { + return nodeName; + } + + public AutoScaling.EventType getType() { + return source.getEventType(); + } + + @Override + public void setContext(Map context) { + this.context = context; + } + + @Override + public Map getContext() { + return context; + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java new file mode 100644 index 00000000000..ae4d459d347 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.autoscaling; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.solr.cloud.ZkController; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.cloud.ZooKeeperException; +import org.apache.solr.common.util.IOUtils; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Overseer thread responsible for reading triggers from zookeeper and + * adding/removing them from {@link ScheduledTriggers} + */ +public class OverseerTriggerThread implements Runnable, Closeable { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final ZkController zkController; + + private final ZkStateReader zkStateReader; + + private final ScheduledTriggers scheduledTriggers; + + private final AutoScaling.TriggerFactory triggerFactory; + + private final ReentrantLock updateLock = new ReentrantLock(); + + private final Condition updated = updateLock.newCondition(); + + /* + Following variables are only accessed or modified when updateLock is held + */ + private int znodeVersion = -1; + + private Map activeTriggers = new HashMap<>(); + + private boolean isClosed = false; + + public OverseerTriggerThread(ZkController zkController) { + this.zkController = zkController; + zkStateReader = zkController.getZkStateReader(); + scheduledTriggers = new ScheduledTriggers(); + triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer()); + } + + @Override + public void close() throws IOException { + updateLock.lock(); + try { + isClosed = true; + activeTriggers.clear(); + updated.signalAll(); + } finally { + updateLock.unlock(); + } + IOUtils.closeQuietly(triggerFactory); + IOUtils.closeQuietly(scheduledTriggers); + } + + @Override + public void run() { + int lastZnodeVersion = znodeVersion; + SolrZkClient zkClient = zkStateReader.getZkClient(); + createWatcher(zkClient); + + while (true) { + Map copy = null; + try { + updateLock.lockInterruptibly(); + if (znodeVersion == lastZnodeVersion) { + updated.await(); + + // are we closed? + if (isClosed) break; + + // spurious wakeup? + if (znodeVersion == lastZnodeVersion) continue; + lastZnodeVersion = znodeVersion; + } + copy = new HashMap<>(activeTriggers); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + log.warn("Interrupted", e); + break; + } finally { + updateLock.unlock(); + } + + Set managedTriggerNames = scheduledTriggers.getScheduledTriggerNames(); + // remove the triggers which are no longer active + for (String managedTriggerName : managedTriggerNames) { + if (!copy.containsKey(managedTriggerName)) { + scheduledTriggers.remove(managedTriggerName); + } + } + // add new triggers and/or replace and close the replaced triggers + for (Map.Entry entry : copy.entrySet()) { + scheduledTriggers.add(entry.getValue()); + } + } + } + + private void createWatcher(SolrZkClient zkClient) { + try { + zkClient.exists(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + // session events are not change events, and do not remove the watcher + if (Event.EventType.None.equals(watchedEvent.getType())) { + return; + } + updateLock.lock(); + + if (isClosed) { + return; + } + + try { + final Stat stat = new Stat(); + final byte[] data = zkClient.getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, this, stat, true); + if (znodeVersion >= stat.getVersion()) { + // protect against reordered watcher fires by ensuring that we only move forward + return; + } + znodeVersion = stat.getVersion(); + Map triggerMap = loadTriggers(triggerFactory, data); + for (Map.Entry entry : triggerMap.entrySet()) { + String triggerName = entry.getKey(); + AutoScaling.Trigger trigger = entry.getValue(); + if (trigger.isEnabled()) { + activeTriggers.put(triggerName, trigger); + } else { + activeTriggers.remove(triggerName); + } + } + updated.signalAll(); + } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) { + log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage()); + } catch (KeeperException e) { + log.error("A ZK error has occurred", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + log.warn("Interrupted", e); + } catch (Exception e) { + log.error("Unexpected exception", e); + } finally { + updateLock.unlock(); + } + } + }, true); + } catch (KeeperException e) { + log.error("Exception in OverseerTriggerThread", e); + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + log.error("OverseerTriggerThread interrupted", e); + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + private static Map loadTriggers(AutoScaling.TriggerFactory triggerFactory, byte[] data) { + ZkNodeProps loaded = ZkNodeProps.load(data); + Map triggers = (Map) loaded.get("triggers"); + + Map triggerMap = new HashMap<>(triggers.size()); + + for (Map.Entry entry : triggers.entrySet()) { + Map props = (Map) entry.getValue(); + String event = (String) props.get("event"); + AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT)); + String triggerName = entry.getKey(); + triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, props)); + } + return triggerMap; + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java new file mode 100644 index 00000000000..22489013a26 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.autoscaling; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.IOUtils; +import org.apache.solr.util.DefaultSolrThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Responsible for scheduling active triggers, starting and stopping them and + * performing actions when they fire + */ +public class ScheduledTriggers implements Closeable { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final Map scheduledTriggers = new HashMap<>(); + + /** + * Thread pool for scheduling the triggers + */ + private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + + /** + * Single threaded executor to run the actions upon a trigger event + */ + private final ExecutorService actionExecutor; + + private boolean isClosed = false; + + public ScheduledTriggers() { + // todo make the core pool size configurable + // it is important to use more than one because a taking time trigger can starve other scheduled triggers + // ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand + // how many triggers we have and secondly, that many threads will always be instantiated and kept around idle + // so it is wasteful as well. Hopefully 4 is a good compromise. + scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(4, + new DefaultSolrThreadFactory("ScheduledTrigger-")); + scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true); + scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor")); + } + + /** + * Adds a new trigger or replaces an existing one. The replaced trigger, if any, is closed + * before the new trigger is run. If a trigger is replaced with itself then this + * operation becomes a no-op. + * + * @param newTrigger the trigger to be managed + * @throws AlreadyClosedException if this class has already been closed + */ + public synchronized void add(AutoScaling.Trigger newTrigger) { + if (isClosed) { + throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore"); + } + ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger); + ScheduledTrigger old = scheduledTriggers.putIfAbsent(newTrigger.getName(), scheduledTrigger); + if (old != null) { + if (old.trigger.equals(newTrigger)) { + // the trigger wasn't actually modified so we do nothing + return; + } + IOUtils.closeQuietly(old); + scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger); + } + newTrigger.setListener(event -> { + AutoScaling.Trigger source = event.getSource(); + if (source.isClosed()) { + log.warn("Ignoring autoscaling event because the source trigger: " + source + " has already been closed"); + return; + } + List actions = source.getActions(); + if (actions != null) { + actionExecutor.submit(() -> { + for (TriggerAction action : actions) { + try { + action.process(event); + } catch (Exception e) { + log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e); + throw e; + } + } + }); + } + }); + scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(newTrigger, 0, 1, TimeUnit.SECONDS); + } + + /** + * Removes and stops the trigger with the given name + * + * @param triggerName the name of the trigger to be removed + * @throws AlreadyClosedException if this class has already been closed + */ + public synchronized void remove(String triggerName) { + if (isClosed) { + throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used any more"); + } + ScheduledTrigger removed = scheduledTriggers.remove(triggerName); + IOUtils.closeQuietly(removed); + } + + /** + * @return an unmodifiable set of names of all triggers being managed by this class + */ + public synchronized Set getScheduledTriggerNames() { + return Collections.unmodifiableSet(scheduledTriggers.keySet()); + } + + @Override + public void close() throws IOException { + synchronized (this) { + // mark that we are closed + isClosed = true; + for (ScheduledTrigger scheduledTrigger : scheduledTriggers.values()) { + IOUtils.closeQuietly(scheduledTrigger); + } + scheduledTriggers.clear(); + } + ExecutorUtil.shutdownAndAwaitTermination(scheduledThreadPoolExecutor); + ExecutorUtil.shutdownAndAwaitTermination(actionExecutor); + } + + private static class ScheduledTrigger implements Closeable { + AutoScaling.Trigger trigger; + ScheduledFuture scheduledFuture; + + ScheduledTrigger(AutoScaling.Trigger trigger) { + this.trigger = trigger; + } + + @Override + public void close() throws IOException { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + IOUtils.closeQuietly(trigger); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java index 7765ac25601..242c9de1232 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java @@ -26,4 +26,9 @@ import org.apache.solr.util.plugin.MapInitializedPlugin; */ public interface TriggerAction extends MapInitializedPlugin, Closeable { // todo nocommit + public String getName(); + + public String getClassName(); + + public void process(AutoScaling.TriggerEvent event); } diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java new file mode 100644 index 00000000000..e603faaf148 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.autoscaling; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.core.CoreContainer; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test for {@link NodeAddedTrigger} + */ +public class NodeAddedTriggerTest extends SolrCloudTestCase { + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(1) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + } + + @Test + public void test() throws Exception { + CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer(); + Map props = new HashMap<>(); + props.put("event", "nodeLost"); + long waitForSeconds = 1 + random().nextInt(5); + props.put("waitFor", waitForSeconds); + props.put("enabled", "true"); + List> actions = new ArrayList<>(3); + Map map = new HashMap<>(2); + map.put("name", "compute_plan"); + map.put("class", "solr.ComputePlanAction"); + actions.add(map); + map = new HashMap<>(2); + map.put("name", "execute_plan"); + map.put("class", "solr.ExecutePlanAction"); + actions.add(map); + map = new HashMap<>(2); + map.put("name", "log_plan"); + map.put("class", "solr.LogPlanAction"); + actions.add(map); + props.put("actions", actions); + + try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) { + trigger.setListener(event -> fail("Did not expect the listener to fire on first run!")); + trigger.run(); + + JettySolrRunner newNode = cluster.startJettySolrRunner(); + AtomicBoolean fired = new AtomicBoolean(false); + AtomicReference eventRef = new AtomicReference<>(); + trigger.setListener(event -> { + if (fired.compareAndSet(false, true)) { + eventRef.set(event); + if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { + fail("NodeAddedListener was fired before the configured waitFor period"); + } + } else { + fail("NodeAddedTrigger was fired more than once!"); + } + }); + int counter = 0; + do { + trigger.run(); + Thread.sleep(1000); + if (counter++ > 10) { + fail("Newly added node was not discovered by trigger even after 10 seconds"); + } + } while (!fired.get()); + + NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get(); + assertNotNull(nodeAddedEvent); + assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName()); + } + + // add a new node but remove it before the waitFor period expires + // and assert that the trigger doesn't fire at all + try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) { + final int waitTime = 2; + props.put("waitFor", waitTime); + trigger.setListener(event -> fail("Did not expect the listener to fire on first run!")); + trigger.run(); + + JettySolrRunner newNode = cluster.startJettySolrRunner(); + AtomicBoolean fired = new AtomicBoolean(false); + trigger.setListener(event -> { + if (fired.compareAndSet(false, true)) { + if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) { + fail("NodeAddedListener was fired before the configured waitFor period"); + } + } else { + fail("NodeAddedTrigger was fired more than once!"); + } + }); + trigger.run(); // first run should detect the new node + newNode.stop(); // stop the new jetty + int counter = 0; + do { + trigger.run(); + Thread.sleep(1000); + if (counter++ > waitTime + 1) { // run it a little more than the wait time + break; + } + } while (true); + + // ensure the event was not fired + assertFalse(fired.get()); + } + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java new file mode 100644 index 00000000000..cb92680ce61 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.autoscaling; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.util.NamedList; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An end-to-end integration test for triggers + */ +public class TriggerIntegrationTest extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static CountDownLatch actionCreated = new CountDownLatch(1); + private static CountDownLatch triggerFiredLatch = new CountDownLatch(1); + private static int waitForSeconds = 1; + private static AtomicBoolean triggerFired = new AtomicBoolean(false); + private static AtomicReference eventRef = new AtomicReference<>(); + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(1) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + waitForSeconds = 1 + random().nextInt(3); + } + + @Test + public void testNodeAddedTrigger() throws Exception { + CloudSolrClient solrClient = cluster.getSolrClient(); + // todo nocommit -- add testing for the v2 path + // String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling"; + String path = "/admin/autoscaling"; + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : 'node_added_trigger'," + + "'event' : 'nodeAdded'," + + "'waitFor' : '" + waitForSeconds + "s'," + + "'enabled' : 'true'," + + "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" + + "}}"; + SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand); + NamedList response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + if (!actionCreated.await(3, TimeUnit.SECONDS)) { + fail("The TriggerAction should have been created by now"); + } + + JettySolrRunner newNode = cluster.startJettySolrRunner(); + boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS); + assertTrue("The trigger did not fire at all", await); + assertTrue(triggerFired.get()); + NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get(); + assertNotNull(nodeAddedEvent); + assertEquals("The node added trigger was fired but for a different node", + newNode.getNodeName(), nodeAddedEvent.getNodeName()); + } + + public static class TestTriggerAction implements TriggerAction { + + public TestTriggerAction() { + log.info("TestTriggerAction instantiated"); + actionCreated.countDown(); + } + + @Override + public String getName() { + return "TestTriggerAction"; + } + + @Override + public String getClassName() { + return this.getClass().getName(); + } + + @Override + public void process(AutoScaling.TriggerEvent event) { + if (triggerFired.compareAndSet(false, true)) { + eventRef.set((NodeAddedTrigger.NodeAddedEvent) event); + if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { + fail("NodeAddedListener was fired before the configured waitFor period"); + } + triggerFiredLatch.countDown(); + } else { + fail("NodeAddedTrigger was fired more than once!"); + } + } + + @Override + public void close() throws IOException { + + } + + @Override + public void init(Map args) { + + } + } +}