SOLR-10996: Implement TriggerListener API.

This commit is contained in:
Andrzej Bialecki 2017-07-10 18:29:15 +02:00
parent b8c86d24d4
commit 9c8e829f58
22 changed files with 1170 additions and 203 deletions

View File

@ -58,6 +58,7 @@ New Features
----------------------
* SOLR-11019: Add addAll Stream Evaluator (Joel Bernstein)
* SOLR-10996: Implement TriggerListener API (ab, shalin)
Bug Fixes
----------------------

View File

@ -38,7 +38,7 @@ public class AutoScaling {
INDEXRATE
}
public enum TriggerStage {
public enum EventProcessorStage {
WAITING,
STARTED,
ABORTED,
@ -48,29 +48,26 @@ public class AutoScaling {
AFTER_ACTION
}
public interface TriggerListener {
/**
* This method is executed when a trigger is ready to fire.
* Implementation of this interface is used for processing events generated by a trigger.
*/
public interface EventProcessor {
/**
* This method is executed for events produced by {@link Trigger#run()}.
*
* @param event a subclass of {@link TriggerEvent}
* @return true if the listener was ready to perform actions on the event, false
* @return true if the processor was ready to perform actions on the event, false
* otherwise. If false was returned then callers should assume the event was discarded.
*/
boolean triggerFired(TriggerEvent event);
}
public static class HttpCallbackListener implements TriggerListener {
@Override
public boolean triggerFired(TriggerEvent event) {
return true;
}
boolean process(TriggerEvent event);
}
/**
* Interface for a Solr trigger. Each trigger implements Runnable and Closeable interface. A trigger
* is scheduled using a {@link java.util.concurrent.ScheduledExecutorService} so it is executed as
* per a configured schedule to check whether the trigger is ready to fire. The {@link Trigger#setListener(TriggerListener)}
* method should be used to set a callback listener which is fired by implementation of this class whenever
* per a configured schedule to check whether the trigger is ready to fire. The {@link Trigger#setProcessor(EventProcessor)}
* method should be used to set a processor which is used by implementation of this class whenever
* ready.
* <p>
* As per the guarantees made by the {@link java.util.concurrent.ScheduledExecutorService} a trigger
@ -79,7 +76,7 @@ public class AutoScaling {
* which can be get/set by a different thread than the one executing the trigger. Therefore, implementations
* should use appropriate synchronization around the listener.
* <p>
* When a trigger is ready to fire, it calls the {@link TriggerListener#triggerFired(TriggerEvent)} event
* When a trigger is ready to fire, it calls the {@link EventProcessor#process(TriggerEvent)} event
* with the proper trigger event object. If that method returns false then it should be interpreted to mean
* that Solr is not ready to process this trigger event and therefore we should retain the state and fire
* at the next invocation of the run() method.
@ -107,11 +104,11 @@ public class AutoScaling {
/** Actions to execute when event is fired. */
List<TriggerAction> getActions();
/** Set event listener to call when event is fired. */
void setListener(TriggerListener listener);
/** Set event processor to call when event is fired. */
void setProcessor(EventProcessor processor);
/** Get event listener. */
TriggerListener getListener();
/** Get event processor. */
EventProcessor getProcessor();
/** Return true when this trigger is closed and cannot be used. */
boolean isClosed();

View File

@ -16,44 +16,79 @@
*/
package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple bean representation of <code>autoscaling.json</code>, which parses data
* lazily.
*/
public class AutoScalingConfig {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, Object> jsonMap;
private Policy policy;
private Map<String, TriggerConfig> triggers;
private Map<String, ListenerConfig> listeners;
private Map<String, TriggerListenerConfig> listeners;
/**
* Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.TriggerListener} config.
* Bean representation of {@link TriggerListener} config.
*/
public static class ListenerConfig {
public String trigger;
public List<String> stages;
public String listenerClass;
public List<Map<String, String>> beforeActions;
public List<Map<String, String>> afterActions;
public static class TriggerListenerConfig {
public final String name;
public final String trigger;
public final EnumSet<AutoScaling.EventProcessorStage> stages = EnumSet.noneOf(AutoScaling.EventProcessorStage.class);
public final String listenerClass;
public final Set<String> beforeActions;
public final Set<String> afterActions;
public final Map<String, Object> properties = new HashMap<>();
public ListenerConfig(Map<String, Object> properties) {
public TriggerListenerConfig(String name, Map<String, Object> properties) {
this.name = name;
this.properties.putAll(properties);
trigger = (String)properties.get(AutoScalingParams.TRIGGER);
stages = (List<String>)properties.getOrDefault(AutoScalingParams.STAGE, Collections.emptyList());
List<String> stageNames = getList(AutoScalingParams.STAGE, properties);
for (String stageName : stageNames) {
try {
AutoScaling.EventProcessorStage stage = AutoScaling.EventProcessorStage.valueOf(stageName.toUpperCase(Locale.ROOT));
stages.add(stage);
} catch (Exception e) {
LOG.warn("Invalid stage name '" + name + "' in listener config, skipping: " + properties);
}
}
listenerClass = (String)properties.get(AutoScalingParams.CLASS);
beforeActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.BEFORE_ACTION, Collections.emptyList());
afterActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.AFTER_ACTION, Collections.emptyList());
beforeActions = new HashSet<>(getList(AutoScalingParams.BEFORE_ACTION, properties));
afterActions = new HashSet<>(getList(AutoScalingParams.AFTER_ACTION, properties));
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TriggerListenerConfig that = (TriggerListenerConfig) o;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false;
if (!stages.equals(that.stages)) return false;
if (listenerClass != null ? !listenerClass.equals(that.listenerClass) : that.listenerClass != null) return false;
if (!beforeActions.equals(that.beforeActions)) return false;
if (!afterActions.equals(that.afterActions)) return false;
return properties.equals(that.properties);
}
}
@ -61,24 +96,49 @@ public class AutoScalingConfig {
* Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} config.
*/
public static class TriggerConfig {
public final String name;
public final AutoScaling.EventType eventType;
public final Map<String, Object> properties = new HashMap<>();
public TriggerConfig(Map<String, Object> properties) {
public TriggerConfig(String name, Map<String, Object> properties) {
this.name = name;
String event = (String) properties.get(AutoScalingParams.EVENT);
this.eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
this.properties.putAll(properties);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TriggerConfig that = (TriggerConfig) o;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
if (eventType != that.eventType) return false;
return properties.equals(that.properties);
}
}
public AutoScalingConfig(byte[] utf8) {
this(utf8 != null && utf8.length > 0 ? (Map<String, Object>)Utils.fromJSON(utf8) : Collections.emptyMap());
}
/**
* Construct from a JSON map representation.
* @param jsonMap
* @param jsonMap JSON map representation of the config.
*/
public AutoScalingConfig(Map<String, Object> jsonMap) {
this.jsonMap = Utils.getDeepCopy(jsonMap, 10);
}
/**
* Return the original JSON map representation that was used for building this config.
*/
public Map<String, Object> getJsonMap() {
return jsonMap;
}
/**
* Get {@link Policy} configuration.
*/
@ -100,31 +160,13 @@ public class AutoScalingConfig {
} else {
triggers = new HashMap<>(trigMap.size());
for (Map.Entry<String, Object> entry : trigMap.entrySet()) {
triggers.put(entry.getKey(), new TriggerConfig((Map<String, Object>)entry.getValue()));
triggers.put(entry.getKey(), new TriggerConfig(entry.getKey(), (Map<String, Object>)entry.getValue()));
}
}
}
return triggers;
}
/**
* Get listener configurations.
*/
public Map<String, ListenerConfig> getListenerConfigs() {
if (listeners == null) {
Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
if (map == null) {
listeners = Collections.emptyMap();
} else {
listeners = new HashMap<>(map.size());
for (Map.Entry<String, Object> entry : map.entrySet()) {
listeners.put(entry.getKey(), new ListenerConfig((Map<String, Object>)entry.getValue()));
}
}
}
return listeners;
}
/**
* Check whether triggers for specific event type exist.
* @param types list of event types
@ -144,4 +186,42 @@ public class AutoScalingConfig {
}
return false;
}
/**
* Get listener configurations.
*/
public Map<String, TriggerListenerConfig> getTriggerListenerConfigs() {
if (listeners == null) {
Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
if (map == null) {
listeners = Collections.emptyMap();
} else {
listeners = new HashMap<>(map.size());
for (Map.Entry<String, Object> entry : map.entrySet()) {
listeners.put(entry.getKey(), new TriggerListenerConfig(entry.getKey(), (Map<String, Object>)entry.getValue()));
}
}
}
return listeners;
}
private static List<String> getList(String key, Map<String, Object> properties) {
return getList(key, properties, null);
}
private static List<String> getList(String key, Map<String, Object> properties, List<String> defaultList) {
if (defaultList == null) {
defaultList = Collections.emptyList();
}
Object o = properties.get(key);
if (o == null) {
return defaultList;
}
if (o instanceof List) {
return (List)o;
} else {
return Collections.singletonList(String.valueOf(o));
}
}
}

View File

@ -379,7 +379,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
for (String stage : stageNames) {
try {
AutoScaling.TriggerStage.valueOf(stage);
AutoScaling.EventProcessorStage.valueOf(stage);
} catch (IllegalArgumentException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid stage name: " + stage);
}
@ -391,7 +391,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
// validate that we can load the listener class
// todo nocommit -- what about MemClassLoader?
try {
container.getResourceLoader().findClass(listenerClass, AutoScaling.TriggerListener.class);
container.getResourceLoader().findClass(listenerClass, TriggerListener.class);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Listener not found: " + listenerClass, e);
}

View File

@ -42,26 +42,9 @@ import org.slf4j.LoggerFactory;
* The cluster operations computed here are put into the {@link ActionContext}'s properties
* with the key name "operations". The value is a List of SolrRequest objects.
*/
public class ComputePlanAction implements TriggerAction {
public class ComputePlanAction extends TriggerActionBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private Map<String, String> initArgs;
@Override
public void close() throws IOException {
}
@Override
public void init(Map<String, String> args) {
this.initArgs = args;
}
@Override
public String getName() {
return initArgs.get("name");
}
@Override
public void process(TriggerEvent event, ActionContext context) {
log.debug("-- processing event: {} with context properties: {}", event, context.getProperties());

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
@ -38,26 +37,9 @@ import org.slf4j.LoggerFactory;
* This class is responsible for executing cluster operations read from the {@link ActionContext}'s properties
* with the key name "operations"
*/
public class ExecutePlanAction implements TriggerAction {
public class ExecutePlanAction extends TriggerActionBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private Map<String, String> initArgs;
@Override
public void close() throws IOException {
}
@Override
public void init(Map<String, String> args) {
this.initArgs = args;
}
@Override
public String getName() {
return initArgs.get("name");
}
@Override
public void process(TriggerEvent event, ActionContext context) {
log.debug("-- processing event: {} with context properties: {}", event, context.getProperties());

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.StringJoiner;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.PropertiesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple HTTP callback that POSTs event data to a URL.
* URL, payload and headers may contain property substitution patterns, with the following properties available:
* <ul>
* <li>config.* - listener configuration</li>
* <li>event.* - event properties</li>
* <li>stage - current stage of event processing</li>
* <li>actionName - optional current action name</li>
* <li>context.* - optional {@link ActionContext} properties</li>
* <li>error - optional error string (from {@link Throwable#toString()})</li>
* <li>message - optional message</li>
* </ul>
* The following listener configuration is supported:
* <ul>
* <li>url - a URL template</li>
* <li>payload - string, optional payload template. If absent a JSON map of all properties listed above will be used.</li>
* <li>contentType - string, optional payload content type. If absent then <code>application/json</code> will be used.</li>
* <li>header.* - string, optional header template(s). The name of the property without "header." prefix defines the literal header name.</li>
* <li>timeout - int, optional connection and socket timeout in milliseconds. Default is 60 seconds.</li>
* <li>followRedirects - boolean, optional setting to follow redirects. Default is false.</li>
* </ul>
*/
public class HttpTriggerListener extends TriggerListenerBase {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private HttpClient httpClient;
private String urlTemplate;
private String payloadTemplate;
private String contentType;
private Map<String, String> headerTemplates = new HashMap<>();
private int timeout = HttpClientUtil.DEFAULT_CONNECT_TIMEOUT;
private boolean followRedirects;
@Override
public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
super.init(coreContainer, config);
httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
urlTemplate = (String)config.properties.get("url");
payloadTemplate = (String)config.properties.get("payload");
contentType = (String)config.properties.get("contentType");
config.properties.forEach((k, v) -> {
if (k.startsWith("header.")) {
headerTemplates.put(k.substring(7), String.valueOf(v));
}
});
timeout = PropertiesUtil.toInteger(String.valueOf(config.properties.get("timeout")), HttpClientUtil.DEFAULT_CONNECT_TIMEOUT);
followRedirects = PropertiesUtil.toBoolean(String.valueOf(config.properties.get("followRedirects")));
}
@Override
public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) {
Properties properties = new Properties();
properties.setProperty("stage", stage.toString());
// if configuration used "actionName" but we're in a non-action related stage then PropertiesUtil will
// throws an exception on missing value - so replace it with an empty string
if (actionName == null) {
actionName = "";
}
properties.setProperty("actionName", actionName);
if (context != null) {
context.getProperties().forEach((k, v) -> {
properties.setProperty("context." + k, String.valueOf(v));
});
}
if (error != null) {
properties.setProperty("error", error.toString());
} else {
properties.setProperty("error", "");
}
if (message != null) {
properties.setProperty("message", message);
} else {
properties.setProperty("message", "");
}
// add event properties
properties.setProperty("event.id", event.getId());
properties.setProperty("event.source", event.getSource());
properties.setProperty("event.eventTime", String.valueOf(event.eventTime));
properties.setProperty("event.eventType", event.getEventType().toString());
event.getProperties().forEach((k, v) -> {
properties.setProperty("event.properties." + k, String.valueOf(v));
});
// add config properties
properties.setProperty("config.name", config.name);
properties.setProperty("config.trigger", config.trigger);
properties.setProperty("config.listenerClass", config.listenerClass);
properties.setProperty("config.beforeActions", String.join(",", config.beforeActions));
properties.setProperty("config.afterActions", String.join(",", config.afterActions));
StringJoiner joiner = new StringJoiner(",");
config.stages.forEach(s -> joiner.add(s.toString()));
properties.setProperty("config.stages", joiner.toString());
config.properties.forEach((k, v) -> {
properties.setProperty("config.properties." + k, String.valueOf(v));
});
String url = PropertiesUtil.substituteProperty(urlTemplate, properties);
String payload;
String type;
if (payloadTemplate != null) {
payload = PropertiesUtil.substituteProperty(payloadTemplate, properties);
if (contentType != null) {
type = contentType;
} else {
type = "application/json";
}
} else {
payload = Utils.toJSONString(properties);
type = "application/json";
}
HttpPost post = new HttpPost(url);
HttpEntity entity = new StringEntity(payload, "UTF-8");
headerTemplates.forEach((k, v) -> {
String headerVal = PropertiesUtil.substituteProperty(v, properties);
if (!headerVal.isEmpty()) {
post.addHeader(k, headerVal);
}
});
post.setEntity(entity);
post.setHeader("Content-Type", type);
org.apache.http.client.config.RequestConfig.Builder requestConfigBuilder = HttpClientUtil.createDefaultRequestConfigBuilder();
requestConfigBuilder.setSocketTimeout(timeout);
requestConfigBuilder.setConnectTimeout(timeout);
requestConfigBuilder.setRedirectsEnabled(followRedirects);
post.setConfig(requestConfigBuilder.build());
try {
HttpClientContext httpClientRequestContext = HttpClientUtil.createNewHttpClientRequestContext();
HttpResponse rsp = httpClient.execute(post, httpClientRequestContext);
int statusCode = rsp.getStatusLine().getStatusCode();
if (statusCode != 200) {
LOG.warn("Error sending request for event " + event + ", HTTP response: " + rsp.toString());
}
HttpEntity responseEntity = rsp.getEntity();
Utils.consumeFully(responseEntity);
} catch (IOException e) {
LOG.warn("Exception sending request for event " + event, e);
}
}
}

View File

@ -17,28 +17,10 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.Map;
/**
* todo nocommit
*/
public class LogPlanAction implements TriggerAction {
@Override
public void close() throws IOException {
}
@Override
public void init(Map<String, String> args) {
}
@Override
public String getName() {
return null;
}
public class LogPlanAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext actionContext) {

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of {@link TriggerListener} that reports
* events to a log.
*/
public class LogTriggerListener extends TriggerListenerBase {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
Throwable error, String message) {
LOG.info("{}: stage={}, actionName={}, event={}, error={}, messsage={}", config.name, stage, actionName, event, error, message);
}
}

View File

@ -52,7 +52,7 @@ public class NodeAddedTrigger extends TriggerBase {
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
private final AtomicReference<AutoScaling.TriggerListener> listenerRef;
private final AtomicReference<AutoScaling.EventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
private final AutoScaling.EventType eventType;
@ -71,7 +71,7 @@ public class NodeAddedTrigger extends TriggerBase {
this.properties = properties;
this.container = container;
this.timeSource = TimeSource.CURRENT_TIME;
this.listenerRef = new AtomicReference<>();
this.processorRef = new AtomicReference<>();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
actions = new ArrayList<>(3);
@ -119,13 +119,13 @@ public class NodeAddedTrigger extends TriggerBase {
}
@Override
public void setListener(AutoScaling.TriggerListener listener) {
listenerRef.set(listener);
public void setProcessor(AutoScaling.EventProcessor processor) {
processorRef.set(processor);
}
@Override
public AutoScaling.TriggerListener getListener() {
return listenerRef.get();
public AutoScaling.EventProcessor getProcessor() {
return processorRef.get();
}
@Override
@ -254,10 +254,10 @@ public class NodeAddedTrigger extends TriggerBase {
long now = timeSource.getTime();
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
// fire!
AutoScaling.TriggerListener listener = listenerRef.get();
if (listener != null) {
log.debug("NodeAddedTrigger {} firing registered listener for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
AutoScaling.EventProcessor processor = processorRef.get();
if (processor != null) {
log.debug("NodeAddedTrigger {} firing registered processor for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
if (processor.process(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
// remove from tracking set only if the fire was accepted
it.remove();
removeNodeAddedMarker(nodeName);

View File

@ -52,7 +52,7 @@ public class NodeLostTrigger extends TriggerBase {
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
private final AtomicReference<AutoScaling.TriggerListener> listenerRef;
private final AtomicReference<AutoScaling.EventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
private final AutoScaling.EventType eventType;
@ -71,7 +71,7 @@ public class NodeLostTrigger extends TriggerBase {
this.properties = properties;
this.container = container;
this.timeSource = TimeSource.CURRENT_TIME;
this.listenerRef = new AtomicReference<>();
this.processorRef = new AtomicReference<>();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
actions = new ArrayList<>(3);
@ -117,13 +117,13 @@ public class NodeLostTrigger extends TriggerBase {
}
@Override
public void setListener(AutoScaling.TriggerListener listener) {
listenerRef.set(listener);
public void setProcessor(AutoScaling.EventProcessor processor) {
processorRef.set(processor);
}
@Override
public AutoScaling.TriggerListener getListener() {
return listenerRef.get();
public AutoScaling.EventProcessor getProcessor() {
return processorRef.get();
}
@Override
@ -249,10 +249,10 @@ public class NodeLostTrigger extends TriggerBase {
Long timeRemoved = entry.getValue();
if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
// fire!
AutoScaling.TriggerListener listener = listenerRef.get();
if (listener != null) {
log.debug("NodeLostTrigger firing registered listener for lost node: {}", nodeName);
if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) {
AutoScaling.EventProcessor processor = processorRef.get();
if (processor != null) {
log.debug("NodeLostTrigger firing registered processor for lost node: {}", nodeName);
if (processor.process(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) {
it.remove();
removeNodeLostMarker(nodeName);
} else {

View File

@ -37,6 +37,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@ -75,6 +76,8 @@ public class OverseerTriggerThread implements Runnable, Closeable {
private boolean isClosed = false;
private AutoScalingConfig autoScalingConfig;
public OverseerTriggerThread(ZkController zkController) {
this.zkController = zkController;
zkStateReader = zkController.getZkStateReader();
@ -163,6 +166,9 @@ public class OverseerTriggerThread implements Runnable, Closeable {
break;
}
// update the current config
scheduledTriggers.setAutoScalingConfig(autoScalingConfig);
Set<String> managedTriggerNames = scheduledTriggers.getScheduledTriggerNames();
// remove the triggers which are no longer active
for (String managedTriggerName : managedTriggerNames) {
@ -265,8 +271,9 @@ public class OverseerTriggerThread implements Runnable, Closeable {
// protect against reordered watcher fires by ensuring that we only move forward
return;
}
autoScalingConfig = new AutoScalingConfig(data);
znodeVersion = stat.getVersion();
Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, data);
Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, autoScalingConfig);
// remove all active triggers that have been removed from ZK
Set<String> trackingKeySet = activeTriggers.keySet();
@ -288,22 +295,19 @@ public class OverseerTriggerThread implements Runnable, Closeable {
}
}
private static Map<String, AutoScaling.Trigger> loadTriggers(AutoScaling.TriggerFactory triggerFactory, byte[] data) {
ZkNodeProps loaded = ZkNodeProps.load(data);
Map<String, Object> triggers = (Map<String, Object>) loaded.get("triggers");
private static Map<String, AutoScaling.Trigger> loadTriggers(AutoScaling.TriggerFactory triggerFactory, AutoScalingConfig autoScalingConfig) {
Map<String, AutoScalingConfig.TriggerConfig> triggers = autoScalingConfig.getTriggerConfigs();
if (triggers == null) {
return Collections.emptyMap();
}
Map<String, AutoScaling.Trigger> triggerMap = new HashMap<>(triggers.size());
for (Map.Entry<String, Object> entry : triggers.entrySet()) {
Map<String, Object> props = (Map<String, Object>) entry.getValue();
String event = (String) props.get(AutoScalingParams.EVENT);
AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
for (Map.Entry<String, AutoScalingConfig.TriggerConfig> entry : triggers.entrySet()) {
AutoScalingConfig.TriggerConfig cfg = entry.getValue();
AutoScaling.EventType eventType = cfg.eventType;
String triggerName = entry.getKey();
triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, props));
triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, cfg.properties));
}
return triggerMap;
}

View File

@ -24,7 +24,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -34,6 +36,8 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.cloud.ActionThrottle;
@ -85,6 +89,10 @@ public class ScheduledTriggers implements Closeable {
private final CoreContainer coreContainer;
private final TriggerListeners listeners;
private AutoScalingConfig autoScalingConfig;
public ScheduledTriggers(ZkController zkController) {
// todo make the core pool size configurable
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
@ -98,9 +106,20 @@ public class ScheduledTriggers implements Closeable {
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
// todo make the wait time configurable
actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
this.coreContainer = zkController.getCoreContainer();
this.zkClient = zkController.getZkClient();
coreContainer = zkController.getCoreContainer();
zkClient = zkController.getZkClient();
queueStats = new Overseer.Stats();
listeners = new TriggerListeners();
}
/**
* Set the current autoscaling config. This is invoked by {@link OverseerTriggerThread} when autoscaling.json is updated,
* and it re-initializes trigger listeners.
* @param autoScalingConfig current autoscaling.json
*/
public void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
this.autoScalingConfig = autoScalingConfig;
listeners.setAutoScalingConfig(autoScalingConfig);
}
/**
@ -127,20 +146,25 @@ public class ScheduledTriggers implements Closeable {
scheduledTrigger.setReplay(false);
scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
}
newTrigger.setListener(event -> {
newTrigger.setProcessor(event -> {
ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
if (scheduledSource == null) {
log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + event.getSource() + " doesn't exist.");
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s doesn't exist.", event.toString(), event.getSource());
listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, msg);
log.warn(msg);
return false;
}
boolean replaying = event.getProperty(TriggerEvent.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEvent.REPLAYING) : false;
AutoScaling.Trigger source = scheduledSource.trigger;
if (source.isClosed()) {
log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + source + " has already been closed");
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.ABORTED, msg);
log.warn(msg);
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
return false;
}
if (hasPendingActions.compareAndSet(false, true)) {
listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.STARTED);
final boolean enqueued;
if (replaying) {
enqueued = false;
@ -158,17 +182,21 @@ public class ScheduledTriggers implements Closeable {
actionThrottle.markAttemptingAction();
ActionContext actionContext = new ActionContext(coreContainer, newTrigger, new HashMap<>());
for (TriggerAction action : actions) {
listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
try {
action.process(event, actionContext);
} catch (Exception e) {
listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, action.getName(), actionContext, e, null);
log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
throw e;
}
listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
}
if (enqueued) {
TriggerEvent ev = scheduledTrigger.dequeue();
assert ev.getId().equals(event.getId());
}
listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
} finally {
hasPendingActions.set(false);
}
@ -181,6 +209,7 @@ public class ScheduledTriggers implements Closeable {
+ " is broken! Expected event=" + event + " but got " + ev);
}
}
listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
hasPendingActions.set(false);
}
return true;
@ -252,6 +281,7 @@ public class ScheduledTriggers implements Closeable {
// guarantee about cluster state
scheduledThreadPoolExecutor.shutdownNow();
actionExecutor.shutdownNow();
listeners.close();
}
private class ScheduledTrigger implements Runnable, Closeable {
@ -303,7 +333,7 @@ public class ScheduledTriggers implements Closeable {
while ((event = queue.peekEvent()) != null) {
// override REPLAYING=true
event.getProperties().put(TriggerEvent.REPLAYING, true);
if (! trigger.getListener().triggerFired(event)) {
if (! trigger.getProcessor().process(event)) {
log.error("Failed to re-play event, discarding: " + event);
}
queue.pollEvent(); // always remove it from queue
@ -339,4 +369,170 @@ public class ScheduledTriggers implements Closeable {
IOUtils.closeQuietly(trigger);
}
}
private class TriggerListeners {
Map<String, Map<AutoScaling.EventProcessorStage, List<TriggerListener>>> listenersPerStage = new HashMap<>();
Map<String, TriggerListener> listenersPerName = new HashMap<>();
ReentrantLock updateLock = new ReentrantLock();
void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
updateLock.lock();
// we will recreate this from scratch
listenersPerStage.clear();
try {
Set<String> triggerNames = autoScalingConfig.getTriggerConfigs().keySet();
Map<String, AutoScalingConfig.TriggerListenerConfig> configs = autoScalingConfig.getTriggerListenerConfigs();
Set<String> listenerNames = configs.entrySet().stream().map(entry -> entry.getValue().name).collect(Collectors.toSet());
// close those for non-existent triggers and nonexistent listener configs
for (Iterator<Map.Entry<String, TriggerListener>> it = listenersPerName.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, TriggerListener> entry = it.next();
String name = entry.getKey();
TriggerListener listener = entry.getValue();
if (!triggerNames.contains(listener.getConfig().trigger) || !listenerNames.contains(name)) {
try {
listener.close();
} catch (Exception e) {
log.warn("Exception closing old listener " + listener.getConfig(), e);
}
it.remove();
}
}
for (Map.Entry<String, AutoScalingConfig.TriggerListenerConfig> entry : configs.entrySet()) {
AutoScalingConfig.TriggerListenerConfig config = entry.getValue();
if (!triggerNames.contains(config.trigger)) {
log.debug("-- skipping listener for non-existent trigger: {}", config);
continue;
}
// find previous instance and reuse if possible
TriggerListener oldListener = listenersPerName.get(config.name);
TriggerListener listener = null;
if (oldListener != null) {
if (!oldListener.getConfig().equals(config)) { // changed config
try {
oldListener.close();
} catch (Exception e) {
log.warn("Exception closing old listener " + listener.getConfig(), e);
}
} else {
listener = oldListener; // reuse
}
}
if (listener == null) { // create new instance
String clazz = config.listenerClass;
try {
listener = coreContainer.getResourceLoader().newInstance(clazz, TriggerListener.class);
} catch (Exception e) {
log.warn("Invalid TriggerListener class name '" + clazz + "', skipping...", e);
}
try {
listener.init(coreContainer, config);
listenersPerName.put(config.name, listener);
} catch (Exception e) {
log.warn("Error initializing TriggerListener " + config, e);
try {
listener.close();
} catch (Exception e1) {
// ignore
}
listener = null;
}
}
if (listener == null) {
continue;
}
// add per stage
for (AutoScaling.EventProcessorStage stage : config.stages) {
addPerStage(config.trigger, stage, listener);
}
// add also for beforeAction / afterAction TriggerStage
if (!config.beforeActions.isEmpty()) {
addPerStage(config.trigger, AutoScaling.EventProcessorStage.BEFORE_ACTION, listener);
}
if (!config.afterActions.isEmpty()) {
addPerStage(config.trigger, AutoScaling.EventProcessorStage.AFTER_ACTION, listener);
}
}
} finally {
updateLock.unlock();
}
}
private void addPerStage(String triggerName, AutoScaling.EventProcessorStage stage, TriggerListener listener) {
Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage =
listenersPerStage.computeIfAbsent(triggerName, k -> new HashMap<>());
List<TriggerListener> lst = perStage.computeIfAbsent(stage, k -> new ArrayList<>(3));
lst.add(listener);
}
void reset() {
updateLock.lock();
try {
listenersPerStage.clear();
for (TriggerListener listener : listenersPerName.values()) {
IOUtils.closeQuietly(listener);
}
listenersPerName.clear();
} finally {
updateLock.unlock();
}
}
void close() {
reset();
}
List<TriggerListener> getTriggerListeners(String trigger, AutoScaling.EventProcessorStage stage) {
Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage = listenersPerStage.get(trigger);
if (perStage == null) {
return Collections.emptyList();
}
List<TriggerListener> lst = perStage.get(stage);
if (lst == null) {
return Collections.emptyList();
} else {
return Collections.unmodifiableList(lst);
}
}
void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage) {
fireListeners(trigger, event, stage, null, null, null, null);
}
void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String message) {
fireListeners(trigger, event, stage, null, null, null, message);
}
void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
ActionContext context) {
fireListeners(trigger, event, stage, actionName, context, null, null);
}
void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
updateLock.lock();
try {
for (TriggerListener listener : getTriggerListeners(trigger, stage)) {
if (actionName != null) {
AutoScalingConfig.TriggerListenerConfig config = listener.getConfig();
if (stage == AutoScaling.EventProcessorStage.BEFORE_ACTION) {
if (!config.beforeActions.contains(actionName)) {
continue;
}
} else if (stage == AutoScaling.EventProcessorStage.AFTER_ACTION) {
if (!config.afterActions.contains(actionName)) {
continue;
}
}
}
try {
listener.onEvent(event, stage, actionName, context, error, message);
} catch (Exception e) {
log.warn("Exception running listener " + listener.getConfig(), e);
}
}
} finally {
updateLock.unlock();
}
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.Map;
/**
* Base class for {@link TriggerAction} implementations.
*/
public abstract class TriggerActionBase implements TriggerAction {
protected Map<String, String> initArgs;
@Override
public String getName() {
if (initArgs != null) {
return initArgs.get("name");
} else {
return getClass().getSimpleName();
}
}
@Override
public void close() throws IOException {
}
@Override
public void init(Map<String, String> args) {
this.initArgs = args;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.Closeable;
import org.apache.solr.core.CoreContainer;
/**
* Implementations of this interface are notified of stages in event processing that they were
* registered for. Note: instances may be closed and re-created on each auto-scaling config update.
*/
public interface TriggerListener extends Closeable {
void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) throws Exception;
AutoScalingConfig.TriggerListenerConfig getConfig();
/**
* This method is called when either a particular <code>stage</code> or
* <code>actionName</code> is reached during event processing.
* @param event current event being processed
* @param stage {@link AutoScaling.EventProcessorStage} that this listener was registered for, or null
* @param actionName {@link TriggerAction} name that this listener was registered for, or null
* @param context optional {@link ActionContext} when the processing stage is related to an action, or null
* @param error optional {@link Throwable} error, or null
* @param message optional message
*/
void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
Throwable error, String message) throws Exception;
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import org.apache.solr.core.CoreContainer;
/**
* Base class for implementations of {@link TriggerListener}.
*/
public abstract class TriggerListenerBase implements TriggerListener {
protected AutoScalingConfig.TriggerListenerConfig config;
protected CoreContainer coreContainer;
@Override
public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
this.coreContainer = coreContainer;
this.config = config;
}
@Override
public AutoScalingConfig.TriggerListenerConfig getConfig() {
return config;
}
@Override
public void close() throws IOException {
}
}

View File

@ -356,7 +356,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
"'trigger' : 'node_lost_trigger'," +
"'stage' : ['STARTED','ABORTED','SUCCEEDED']," +
"'beforeAction' : 'execute_plan'," +
"'class' : 'org.apache.solr.cloud.autoscaling.AutoScaling$HttpCallbackListener'," +
"'class' : 'org.apache.solr.cloud.autoscaling.HttpTriggerListener'," +
"'url' : 'http://xyz.com/on_node_lost?node={$LOST_NODE_NAME}'" +
"}" +
"}";
@ -371,7 +371,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
assertTrue(listeners.containsKey("xyz"));
Map<String, Object> xyzListener = (Map<String, Object>) listeners.get("xyz");
assertEquals(5, xyzListener.size());
assertEquals("org.apache.solr.cloud.autoscaling.AutoScaling$HttpCallbackListener", xyzListener.get("class").toString());
assertEquals("org.apache.solr.cloud.autoscaling.HttpTriggerListener", xyzListener.get("class").toString());
String removeTriggerCommand = "{" +
"'remove-trigger' : {" +
@ -422,7 +422,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
"'trigger' : 'node_lost_trigger'," +
"'stage' : ['STARTED','ABORTED','SUCCEEDED']," +
"'beforeAction' : 'execute_plan'," +
"'class' : 'org.apache.solr.cloud.autoscaling.AutoScaling$HttpCallbackListener'," +
"'class' : 'org.apache.solr.cloud.autoscaling.AutoScaling$HttpTriggerListener'," +
"'url' : 'http://xyz.com/on_node_lost?node={$LOST_NODE_NAME}'" +
"}" +
"}";

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.util.LogLevel;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
/**
*
*/
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
@SolrTestCaseJ4.SuppressSSL
public class HttpTriggerListenerTest extends SolrCloudTestCase {
private static CountDownLatch triggerFiredLatch;
private MockService mockService;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(2)
.addConfig("conf", configset("cloud-minimal"))
.configure();
}
@Before
public void setupTest() throws Exception {
mockService = new MockService();
mockService.start();
triggerFiredLatch = new CountDownLatch(1);
}
@After
public void teardownTest() throws Exception {
if (mockService != null) {
mockService.close();
}
}
@Test
public void testHttpListenerIntegration() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger'," +
"'event' : 'nodeAdded'," +
"'waitFor' : '0s'," +
"'enabled' : true," +
"'actions' : [" +
"{'name':'test','class':'" + TestDummyAction.class.getName() + "'}" +
"]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
String setListenerCommand = "{" +
"'set-listener' : " +
"{" +
"'name' : 'foo'," +
"'trigger' : 'node_added_trigger'," +
"'stage' : ['WAITING', 'STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
"'beforeAction' : 'test'," +
"'afterAction' : ['test']," +
"'class' : '" + HttpTriggerListener.class.getName() + "'," +
"'url' : '" + mockService.server.getURI().toString() + "/${config.name:invalid}/${config.properties.xyz:invalid}/${stage}'," +
"'payload': 'actionName=${actionName}, source=${event.source}, type=${event.eventType}'," +
"'header.X-Foo' : '${config.name:invalid}'," +
"'xyz': 'foo'" +
"}" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
assertEquals(requests.toString(), 0, requests.size());
cluster.startJettySolrRunner();
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
Thread.sleep(5000);
assertEquals(requests.toString(), 4, requests.size());
requests.forEach(s -> assertTrue(s.contains("Content-Type: application/json")));
requests.forEach(s -> assertTrue(s.contains("X-Foo: foo")));
requests.forEach(s -> assertTrue(s.contains("source=node_added_trigger")));
requests.forEach(s -> assertTrue(s.contains("type=NODEADDED")));
String request = requests.get(0);
assertTrue(request, request.startsWith("/foo/foo/STARTED"));
assertTrue(request, request.contains("actionName=,")); // empty actionName
request = requests.get(1);
assertTrue(request, request.startsWith("/foo/foo/BEFORE_ACTION"));
assertTrue(request, request.contains("actionName=test,")); // actionName
request = requests.get(2);
assertTrue(request, request.startsWith("/foo/foo/AFTER_ACTION"));
assertTrue(request, request.contains("actionName=test,")); // actionName
request = requests.get(3);
assertTrue(request, request.startsWith("/foo/foo/SUCCEEDED"));
assertTrue(request, request.contains("actionName=,")); // empty actionName
}
public static class TestDummyAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) {
triggerFiredLatch.countDown();
}
}
static List<String> requests = new ArrayList<>();
private static class MockService extends Thread {
Server server;
public void start() {
server = new Server(new InetSocketAddress("localhost", 0));
server.setHandler(new AbstractHandler() {
@Override
public void handle(String s, Request request, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(httpServletRequest.getRequestURI());
Enumeration<String> headerNames = httpServletRequest.getHeaderNames();
while (headerNames.hasMoreElements()) {
stringBuilder.append('\n');
String name = headerNames.nextElement();
stringBuilder.append(name);
stringBuilder.append(": ");
stringBuilder.append(httpServletRequest.getHeader(name));
}
stringBuilder.append("\n\n");
ServletInputStream is = request.getInputStream();
byte[] httpInData = new byte[request.getContentLength()];
int len = -1;
while ((len = is.read(httpInData)) != -1) {
stringBuilder.append(new String(httpInData, 0, len, "UTF-8"));
}
requests.add(stringBuilder.toString());
httpServletResponse.setStatus(HttpServletResponse.SC_OK);
request.setHandled(true);
}
});
try {
server.start();
for (int i = 0; i < 30; i++) {
Thread.sleep(1000);
if (server.isRunning()) {
break;
}
if (server.isFailed()) {
throw new Exception("MockService startup failed - the test will fail...");
}
}
} catch (Exception e) {
throw new RuntimeException("Exception starting MockService", e);
}
}
void close() throws Exception {
if (server != null) {
server.stop();
}
}
}
}

View File

@ -43,7 +43,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
private AutoScaling.TriggerListener noFirstRunListener = event -> {
private AutoScaling.EventProcessor noFirstRunProcessor = event -> {
fail("Did not expect the listener to fire on first run!");
return true;
};
@ -73,13 +73,13 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
Map<String, Object> props = createTriggerProps(waitForSeconds);
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
trigger.setListener(noFirstRunListener);
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
JettySolrRunner newNode = cluster.startJettySolrRunner();
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
trigger.setListener(event -> {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTime();
@ -112,12 +112,12 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
final long waitTime = 2;
props.put("waitFor", waitTime);
trigger.setListener(noFirstRunListener);
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
JettySolrRunner newNode = cluster.startJettySolrRunner();
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setListener(event -> {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
long currentTimeNanos = timeSource.getTime();
long eventTimeNanos = event.getEventTime();
@ -196,14 +196,14 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
Map<String, Object> props = createTriggerProps(0);
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
trigger.setListener(noFirstRunListener);
trigger.setProcessor(noFirstRunProcessor);
trigger.run(); // starts tracking live nodes
JettySolrRunner newNode = cluster.startJettySolrRunner();
AtomicInteger callCount = new AtomicInteger(0);
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setListener(event -> {
trigger.setProcessor(event -> {
if (callCount.incrementAndGet() < 2) {
return false;
} else {
@ -232,7 +232,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
// add a new node but update the trigger before the waitFor period expires
// and assert that the new trigger still fires
NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container);
trigger.setListener(noFirstRunListener);
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
JettySolrRunner newNode = cluster.startJettySolrRunner();
@ -251,7 +251,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
newTrigger.setListener(event -> {
newTrigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTime();

View File

@ -43,7 +43,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
private AutoScaling.TriggerListener noFirstRunListener = event -> {
private AutoScaling.EventProcessor noFirstRunProcessor = event -> {
fail("Did not expect the listener to fire on first run!");
return true;
};
@ -74,14 +74,14 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
Map<String, Object> props = createTriggerProps(waitForSeconds);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
trigger.setListener(noFirstRunListener);
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
String lostNodeName = cluster.getJettySolrRunner(1).getNodeName();
cluster.stopJettySolrRunner(1);
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
trigger.setListener(event -> {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTime();
@ -115,13 +115,13 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
final long waitTime = 2;
props.put("waitFor", waitTime);
trigger.setListener(noFirstRunListener);
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
JettySolrRunner lostNode = cluster.getJettySolrRunner(1);
lostNode.stop();
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setListener(event -> {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
long currentTimeNanos = timeSource.getTime();
long eventTimeNanos = event.getEventTime();
@ -210,7 +210,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
Map<String, Object> props = createTriggerProps(0);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container)) {
trigger.setListener(noFirstRunListener);
trigger.setProcessor(noFirstRunProcessor);
JettySolrRunner newNode = cluster.startJettySolrRunner();
cluster.waitForAllNodes(5);
@ -230,7 +230,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
AtomicInteger callCount = new AtomicInteger(0);
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setListener(event -> {
trigger.setProcessor(event -> {
if (callCount.incrementAndGet() < 2) {
return false;
} else {
@ -263,7 +263,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
// and assert that the new trigger still fires
NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container);
trigger.setListener(noFirstRunListener);
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
// stop the newly created node
@ -291,7 +291,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
newTrigger.setListener(event -> {
newTrigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTime();

View File

@ -17,8 +17,9 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -42,6 +43,7 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.apache.solr.util.TimeSource;
@ -566,17 +568,12 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
}
public static class TestTriggerAction implements TriggerAction {
public static class TestTriggerAction extends TriggerActionBase {
public TestTriggerAction() {
actionConstructorCalled.countDown();
}
@Override
public String getName() {
return "TestTriggerAction";
}
@Override
public void process(TriggerEvent event, ActionContext actionContext) {
try {
@ -595,29 +592,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
@Override
public void close() throws IOException {
}
@Override
public void init(Map<String, String> args) {
log.info("TestTriggerAction init");
actionInitCalled.countDown();
super.init(args);
}
}
public static class TestEventQueueAction implements TriggerAction {
public static class TestEventQueueAction extends TriggerActionBase {
public TestEventQueueAction() {
log.info("TestEventQueueAction instantiated");
}
@Override
public String getName() {
return this.getClass().getSimpleName();
}
@Override
public void process(TriggerEvent event, ActionContext actionContext) {
log.info("-- event: " + event);
@ -633,15 +621,11 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
@Override
public void close() throws IOException {
}
@Override
public void init(Map<String, String> args) {
log.debug("TestTriggerAction init");
actionInitCalled.countDown();
super.init(args);
}
}
@ -790,17 +774,12 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
return listener;
}
public static class TestEventMarkerAction implements TriggerAction {
public static class TestEventMarkerAction extends TriggerActionBase {
public TestEventMarkerAction() {
actionConstructorCalled.countDown();
}
@Override
public String getName() {
return "TestEventMarkerAction";
}
@Override
public void process(TriggerEvent event, ActionContext actionContext) {
boolean locked = lock.tryLock();
@ -819,15 +798,11 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
@Override
public void close() throws IOException {
}
@Override
public void init(Map<String, String> args) {
log.info("TestEventMarkerAction init");
actionInitCalled.countDown();
super.init(args);
}
}
@ -952,4 +927,209 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(overseerLeader, ev.getProperty(TriggerEvent.NODE_NAME));
assertEquals(AutoScaling.EventType.NODELOST, ev.getEventType());
}
private static class TestEvent {
final AutoScalingConfig.TriggerListenerConfig config;
final AutoScaling.EventProcessorStage stage;
final String actionName;
final TriggerEvent event;
final String message;
TestEvent(AutoScalingConfig.TriggerListenerConfig config, AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message) {
this.config = config;
this.stage = stage;
this.actionName = actionName;
this.event = event;
this.message = message;
}
@Override
public String toString() {
return "TestEvent{" +
"config=" + config +
", stage=" + stage +
", actionName='" + actionName + '\'' +
", event=" + event +
", message='" + message + '\'' +
'}';
}
}
static Map<String, List<TestEvent>> listenerEvents = new HashMap<>();
static CountDownLatch listenerCreated = new CountDownLatch(1);
static boolean failDummyAction = false;
public static class TestTriggerListener extends TriggerListenerBase {
@Override
public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
super.init(coreContainer, config);
listenerCreated.countDown();
}
@Override
public synchronized void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<TestEvent> lst = listenerEvents.get(config.name);
if (lst == null) {
lst = new ArrayList<>();
listenerEvents.put(config.name, lst);
}
lst.add(new TestEvent(config, stage, actionName, event, message));
}
}
public static class TestDummyAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) {
if (failDummyAction) {
throw new RuntimeException("failure");
}
}
}
@Test
public void testListeners() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger'," +
"'event' : 'nodeAdded'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
"'actions' : [" +
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}," +
"{'name':'test1','class':'" + TestDummyAction.class.getName() + "'}," +
"]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
fail("The TriggerAction should have been created by now");
}
String setListenerCommand = "{" +
"'set-listener' : " +
"{" +
"'name' : 'foo'," +
"'trigger' : 'node_added_trigger'," +
"'stage' : ['WAITING', 'STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
"'beforeAction' : 'test'," +
"'afterAction' : ['test', 'test1']," +
"'class' : '" + TestTriggerListener.class.getName() + "'" +
"}" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
String setListenerCommand1 = "{" +
"'set-listener' : " +
"{" +
"'name' : 'bar'," +
"'trigger' : 'node_added_trigger'," +
"'stage' : ['WAITING','FAILED','SUCCEEDED']," +
"'beforeAction' : ['test', 'test1']," +
"'afterAction' : 'test'," +
"'class' : '" + TestTriggerListener.class.getName() + "'" +
"}" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
listenerEvents.clear();
failDummyAction = false;
JettySolrRunner newNode = cluster.startJettySolrRunner();
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
assertEquals("both listeners should have fired", 2, listenerEvents.size());
Thread.sleep(2000);
// check foo events
List<TestEvent> testEvents = listenerEvents.get("foo");
assertNotNull("foo events: " + testEvents, testEvents);
assertEquals("foo events: " + testEvents, 5, testEvents.size());
assertEquals(AutoScaling.EventProcessorStage.STARTED, testEvents.get(0).stage);
assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
assertEquals("test", testEvents.get(1).actionName);
assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
assertEquals("test", testEvents.get(2).actionName);
assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(3).stage);
assertEquals("test1", testEvents.get(3).actionName);
assertEquals(AutoScaling.EventProcessorStage.SUCCEEDED, testEvents.get(4).stage);
// check bar events
testEvents = listenerEvents.get("bar");
assertNotNull("bar events", testEvents);
assertEquals("bar events", 4, testEvents.size());
assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
assertEquals("test", testEvents.get(0).actionName);
assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
assertEquals("test", testEvents.get(1).actionName);
assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
assertEquals("test1", testEvents.get(2).actionName);
assertEquals(AutoScaling.EventProcessorStage.SUCCEEDED, testEvents.get(3).stage);
// reset
triggerFired.set(false);
triggerFiredLatch = new CountDownLatch(1);
listenerEvents.clear();
failDummyAction = true;
newNode = cluster.startJettySolrRunner();
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
Thread.sleep(2000);
// check foo events
testEvents = listenerEvents.get("foo");
assertNotNull("foo events: " + testEvents, testEvents);
assertEquals("foo events: " + testEvents, 4, testEvents.size());
assertEquals(AutoScaling.EventProcessorStage.STARTED, testEvents.get(0).stage);
assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
assertEquals("test", testEvents.get(1).actionName);
assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
assertEquals("test", testEvents.get(2).actionName);
assertEquals(AutoScaling.EventProcessorStage.FAILED, testEvents.get(3).stage);
assertEquals("test1", testEvents.get(3).actionName);
// check bar events
testEvents = listenerEvents.get("bar");
assertNotNull("bar events", testEvents);
assertEquals("bar events", 4, testEvents.size());
assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
assertEquals("test", testEvents.get(0).actionName);
assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
assertEquals("test", testEvents.get(1).actionName);
assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
assertEquals("test1", testEvents.get(2).actionName);
assertEquals(AutoScaling.EventProcessorStage.FAILED, testEvents.get(3).stage);
assertEquals("test1", testEvents.get(3).actionName);
}
}

View File

@ -73,8 +73,8 @@ public class HttpClientUtil {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int DEFAULT_CONNECT_TIMEOUT = 60000;
private static final int DEFAULT_SO_TIMEOUT = 600000;
public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
public static final int DEFAULT_SO_TIMEOUT = 600000;
private static final int VALIDATE_AFTER_INACTIVITY_DEFAULT = 3000;
private static final int EVICT_IDLE_CONNECTIONS_DEFAULT = 50000;