mirror of https://github.com/apache/lucene.git
SOLR-13072: Management of markers for nodeLost / nodeAdded events is broken.
This commit is contained in:
parent
71f024ac8f
commit
1f0e875db6
|
@ -179,6 +179,9 @@ Bug Fixes
|
|||
|
||||
* SOLR-10975: New Admin UI Query does not URL-encode the query produced in the URL box (janhoy)
|
||||
|
||||
* SOLR-13072: Management of markers for nodeLost / nodeAdded events is broken. This bug could have caused
|
||||
some events to be lost if they coincided with an Overseer leader crash. (ab)
|
||||
|
||||
Improvements
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -98,6 +98,7 @@ import org.apache.solr.common.util.IOUtils;
|
|||
import org.apache.solr.common.util.ObjectReleaseTracker;
|
||||
import org.apache.solr.common.util.SolrjNamedThreadFactory;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.common.util.URLUtil;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CloseHook;
|
||||
|
@ -973,10 +974,12 @@ public class ZkController implements Closeable {
|
|||
log.warn("Unable to read autoscaling.json", e1);
|
||||
}
|
||||
if (createNodes) {
|
||||
byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", cloudManager.getTimeSource().getEpochTimeNs()));
|
||||
for (String n : oldNodes) {
|
||||
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n;
|
||||
|
||||
try {
|
||||
zkClient.create(path, null, CreateMode.PERSISTENT, true);
|
||||
zkClient.create(path, json, CreateMode.PERSISTENT, true);
|
||||
} catch (KeeperException.NodeExistsException e) {
|
||||
// someone else already created this node - ignore
|
||||
} catch (KeeperException | InterruptedException e1) {
|
||||
|
@ -1078,7 +1081,8 @@ public class ZkController implements Closeable {
|
|||
if (createMarkerNode && !zkClient.exists(nodeAddedPath, true)) {
|
||||
// use EPHEMERAL so that it disappears if this node goes down
|
||||
// and no other action is taken
|
||||
ops.add(Op.create(nodeAddedPath, null, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath), CreateMode.EPHEMERAL));
|
||||
byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", TimeSource.NANO_TIME.getEpochTimeNs()));
|
||||
ops.add(Op.create(nodeAddedPath, json, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath), CreateMode.EPHEMERAL));
|
||||
}
|
||||
zkClient.multi(ops, true);
|
||||
}
|
||||
|
|
|
@ -229,6 +229,10 @@ public class AutoScaling {
|
|||
" 'class':'solr.InactiveShardPlanAction'" +
|
||||
" }," +
|
||||
" {" +
|
||||
" 'name':'inactive_markers_plan'," +
|
||||
" 'class':'solr.InactiveMarkersPlanAction'" +
|
||||
" }," +
|
||||
" {" +
|
||||
" 'name':'execute_plan'," +
|
||||
" 'class':'solr.ExecutePlanAction'" +
|
||||
" }" +
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This plan simply removes nodeAdded and nodeLost markers from Zookeeper if their TTL has
|
||||
* expired. These markers are used by {@link NodeAddedTrigger} and {@link NodeLostTrigger} to
|
||||
* ensure fault tolerance in case of Overseer leader crash.
|
||||
*/
|
||||
public class InactiveMarkersPlanAction extends TriggerActionBase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
public static final String TTL_PROP = "ttl";
|
||||
|
||||
public static final int DEFAULT_TTL_SECONDS = 3600 * 24 * 2;
|
||||
|
||||
private int cleanupTTL;
|
||||
|
||||
public InactiveMarkersPlanAction() {
|
||||
super();
|
||||
TriggerUtils.validProperties(validProperties, TTL_PROP);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
|
||||
super.configure(loader, cloudManager, properties);
|
||||
String cleanupStr = String.valueOf(properties.getOrDefault(TTL_PROP, String.valueOf(DEFAULT_TTL_SECONDS)));
|
||||
try {
|
||||
cleanupTTL = Integer.parseInt(cleanupStr);
|
||||
} catch (Exception e) {
|
||||
throw new TriggerValidationException(getName(), TTL_PROP, "invalid value '" + cleanupStr + "': " + e.toString());
|
||||
}
|
||||
if (cleanupTTL < 0) {
|
||||
throw new TriggerValidationException(getName(), TTL_PROP, "invalid value '" + cleanupStr + "', should be > 0. ");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||
log.trace("-- {} cleaning markers", getName());
|
||||
// use epoch time to track this across JVMs and nodes
|
||||
long currentTimeNs = cloudManager.getTimeSource().getEpochTimeNs();
|
||||
Map<String, Object> results = new LinkedHashMap<>();
|
||||
Set<String> cleanedUp = new TreeSet<>();
|
||||
cleanupMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, currentTimeNs, cleanedUp);
|
||||
if (!cleanedUp.isEmpty()) {
|
||||
results.put("nodeAdded", cleanedUp);
|
||||
cleanedUp = new TreeSet<>();
|
||||
}
|
||||
cleanupMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, currentTimeNs, cleanedUp);
|
||||
if (!cleanedUp.isEmpty()) {
|
||||
results.put("nodeLost", cleanedUp);
|
||||
}
|
||||
if (!results.isEmpty()) {
|
||||
context.getProperties().put(getName(), results);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupMarkers(String path, long currentTimeNs, Set<String> cleanedUp) throws Exception {
|
||||
DistribStateManager stateManager = cloudManager.getDistribStateManager();
|
||||
if (!stateManager.hasData(path)) {
|
||||
return;
|
||||
}
|
||||
List<String> markers = stateManager.listData(path);
|
||||
markers.forEach(m -> {
|
||||
String markerPath = path + "/" + m;
|
||||
try {
|
||||
Map<String, Object> payload = Utils.getJson(stateManager, markerPath);
|
||||
if (payload.isEmpty()) {
|
||||
log.trace(" -- ignore {}: either missing or unsupported format", markerPath);
|
||||
return;
|
||||
}
|
||||
long timestamp = ((Number)payload.get("timestamp")).longValue();
|
||||
long delta = TimeUnit.NANOSECONDS.toSeconds(currentTimeNs - timestamp);
|
||||
if (delta > cleanupTTL) {
|
||||
try {
|
||||
stateManager.removeData(markerPath, -1);
|
||||
log.trace(" -- remove {}, delta={}, ttl={}", markerPath, delta, cleanupTTL);
|
||||
cleanedUp.add(m);
|
||||
} catch (NoSuchElementException nse) {
|
||||
// someone already removed it - ignore
|
||||
return;
|
||||
} catch (BadVersionException be) {
|
||||
throw new RuntimeException("should never happen", be);
|
||||
} catch (NotEmptyException ne) {
|
||||
log.error("Marker znode should be empty but it's not! Ignoring {} ({})", markerPath, ne.toString());
|
||||
}
|
||||
} else {
|
||||
log.trace(" -- keep {}, delta={}, ttl={}", markerPath, delta, cleanupTTL);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
} catch (IOException | KeeperException e) {
|
||||
log.warn("Could not cleanup marker at {}, skipping... ({}}", markerPath, e.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -74,7 +74,6 @@ public class NodeAddedTrigger extends TriggerBase {
|
|||
log.debug("Adding node from marker path: {}", n);
|
||||
nodeNameVsTimeAdded.put(n, cloudManager.getTimeSource().getTimeNs());
|
||||
}
|
||||
removeMarker(n);
|
||||
});
|
||||
} catch (NoSuchElementException e) {
|
||||
// ignore
|
||||
|
@ -187,14 +186,15 @@ public class NodeAddedTrigger extends TriggerBase {
|
|||
if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames, preferredOp))) {
|
||||
// remove from tracking set only if the fire was accepted
|
||||
nodeNames.forEach(n -> {
|
||||
log.debug("Removing new node from tracking: {}", n);
|
||||
nodeNameVsTimeAdded.remove(n);
|
||||
removeMarker(n);
|
||||
});
|
||||
} else {
|
||||
log.debug("Processor returned false for {}!", nodeNames);
|
||||
}
|
||||
} else {
|
||||
nodeNames.forEach(n -> {
|
||||
nodeNameVsTimeAdded.remove(n);
|
||||
removeMarker(n);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -204,21 +204,6 @@ public class NodeAddedTrigger extends TriggerBase {
|
|||
}
|
||||
}
|
||||
|
||||
private void removeMarker(String nodeName) {
|
||||
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
|
||||
try {
|
||||
log.debug("NodeAddedTrigger {} - removing marker path: {}", name, path);
|
||||
if (stateManager.hasData(path)) {
|
||||
stateManager.removeData(path, -1);
|
||||
}
|
||||
} catch (NoSuchElementException e) {
|
||||
// ignore
|
||||
} catch (Exception e) {
|
||||
log.debug("Exception removing nodeAdded marker " + nodeName, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class NodeAddedEvent extends TriggerEvent {
|
||||
|
||||
public NodeAddedEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames, String preferredOp) {
|
||||
|
|
|
@ -63,7 +63,7 @@ public class NodeLostTrigger extends TriggerBase {
|
|||
public void init() throws Exception {
|
||||
super.init();
|
||||
lastLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
|
||||
log.info("NodeLostTrigger {} - Initial livenodes: {}", name, lastLiveNodes);
|
||||
log.debug("NodeLostTrigger {} - Initial livenodes: {}", name, lastLiveNodes);
|
||||
// pick up lost nodes for which marker paths were created
|
||||
try {
|
||||
List<String> lost = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||
|
@ -74,7 +74,6 @@ public class NodeLostTrigger extends TriggerBase {
|
|||
log.debug("Adding lost node from marker path: {}", n);
|
||||
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTimeNs());
|
||||
}
|
||||
removeMarker(n);
|
||||
});
|
||||
} catch (NoSuchElementException e) {
|
||||
// ignore
|
||||
|
@ -160,7 +159,7 @@ public class NodeLostTrigger extends TriggerBase {
|
|||
Set<String> copyOfLastLiveNodes = new HashSet<>(lastLiveNodes);
|
||||
copyOfLastLiveNodes.removeAll(newLiveNodes);
|
||||
copyOfLastLiveNodes.forEach(n -> {
|
||||
log.info("Tracking lost node: {}", n);
|
||||
log.debug("Tracking lost node: {}", n);
|
||||
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTimeNs());
|
||||
});
|
||||
|
||||
|
@ -187,7 +186,6 @@ public class NodeLostTrigger extends TriggerBase {
|
|||
// remove from tracking set only if the fire was accepted
|
||||
nodeNames.forEach(n -> {
|
||||
nodeNameVsTimeRemoved.remove(n);
|
||||
removeMarker(n);
|
||||
});
|
||||
} else {
|
||||
log.debug("NodeLostTrigger processor for lost nodes: {} is not ready, will try later", nodeNames);
|
||||
|
@ -195,7 +193,6 @@ public class NodeLostTrigger extends TriggerBase {
|
|||
} else {
|
||||
nodeNames.forEach(n -> {
|
||||
nodeNameVsTimeRemoved.remove(n);
|
||||
removeMarker(n);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -207,19 +204,6 @@ public class NodeLostTrigger extends TriggerBase {
|
|||
}
|
||||
}
|
||||
|
||||
private void removeMarker(String nodeName) {
|
||||
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeName;
|
||||
try {
|
||||
if (stateManager.hasData(path)) {
|
||||
stateManager.removeData(path, -1);
|
||||
}
|
||||
} catch (NoSuchElementException e) {
|
||||
// ignore
|
||||
} catch (Exception e) {
|
||||
log.warn("Exception removing nodeLost marker " + nodeName, e);
|
||||
}
|
||||
}
|
||||
|
||||
public static class NodeLostEvent extends TriggerEvent {
|
||||
|
||||
public NodeLostEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames, String preferredOp) {
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.lang.invoke.MethodHandles;
|
|||
import java.net.ConnectException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
|
@ -31,7 +30,6 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
|
||||
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.common.AlreadyClosedException;
|
||||
|
@ -226,19 +224,11 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
|
|||
scheduledTriggers.remove(managedTriggerName);
|
||||
}
|
||||
}
|
||||
// check for nodeLost triggers in the current config, and if
|
||||
// absent then clean up old nodeLost / nodeAdded markers
|
||||
boolean cleanOldNodeLostMarkers = true;
|
||||
boolean cleanOldNodeAddedMarkers = true;
|
||||
// nodeLost / nodeAdded markers are checked by triggers during their init() call
|
||||
// which is invoked in scheduledTriggers.add(), so once this is done we can remove them
|
||||
try {
|
||||
// add new triggers and/or replace and close the replaced triggers
|
||||
for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
|
||||
if (entry.getValue().getEventType().equals(TriggerEventType.NODELOST)) {
|
||||
cleanOldNodeLostMarkers = false;
|
||||
}
|
||||
if (entry.getValue().getEventType().equals(TriggerEventType.NODEADDED)) {
|
||||
cleanOldNodeAddedMarkers = false;
|
||||
}
|
||||
try {
|
||||
scheduledTriggers.add(entry.getValue());
|
||||
} catch (AlreadyClosedException e) {
|
||||
|
@ -255,48 +245,19 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
|
|||
throw new IllegalStateException("Caught AlreadyClosedException from ScheduledTriggers, but we're not closed yet!", e);
|
||||
}
|
||||
}
|
||||
DistribStateManager stateManager = cloudManager.getDistribStateManager();
|
||||
if (cleanOldNodeLostMarkers) {
|
||||
log.debug("-- clean old nodeLost markers");
|
||||
try {
|
||||
List<String> markers = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||
markers.forEach(n -> {
|
||||
removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n);
|
||||
});
|
||||
} catch (NoSuchElementException e) {
|
||||
// ignore
|
||||
} catch (Exception e) {
|
||||
log.warn("Error removing old nodeLost markers", e);
|
||||
}
|
||||
}
|
||||
if (cleanOldNodeAddedMarkers) {
|
||||
log.debug("-- clean old nodeAdded markers");
|
||||
try {
|
||||
List<String> markers = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||
markers.forEach(n -> {
|
||||
removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n);
|
||||
});
|
||||
} catch (NoSuchElementException e) {
|
||||
// ignore
|
||||
} catch (AlreadyClosedException e) {
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("Error removing old nodeAdded markers", e);
|
||||
}
|
||||
|
||||
}
|
||||
log.debug("-- cleaning old nodeLost / nodeAdded markers");
|
||||
removeMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||
removeMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeNodeMarker(String path, String nodeName) {
|
||||
path = path + "/" + nodeName;
|
||||
private void removeMarkers(String path) {
|
||||
try {
|
||||
cloudManager.getDistribStateManager().removeData(path, -1);
|
||||
log.debug(" -- deleted " + path);
|
||||
cloudManager.getDistribStateManager().removeRecursively(path, true, false);
|
||||
} catch (NoSuchElementException e) {
|
||||
// ignore
|
||||
} catch (Exception e) {
|
||||
log.warn("Error removing old marker " + path, e);
|
||||
log.warn("Error removing old markers", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -267,6 +267,8 @@ public class ScheduledTriggers implements Closeable {
|
|||
return false;
|
||||
} else {
|
||||
log.debug("++++++++ Cooldown inactive - processing event: " + event);
|
||||
// start cooldown here to immediately reject other events
|
||||
cooldownStart.set(cloudManager.getTimeSource().getTimeNs());
|
||||
}
|
||||
if (hasPendingActions.compareAndSet(false, true)) {
|
||||
// pause all triggers while we execute actions so triggers do not operate on a cluster in transition
|
||||
|
@ -286,6 +288,7 @@ public class ScheduledTriggers implements Closeable {
|
|||
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s from trigger %s because the executor has already been closed", event.toString(), source);
|
||||
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
|
||||
log.warn(msg);
|
||||
hasPendingActions.set(false);
|
||||
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
|
||||
return false;
|
||||
}
|
||||
|
@ -325,6 +328,7 @@ public class ScheduledTriggers implements Closeable {
|
|||
triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED);
|
||||
log.warn("Unhandled exception executing actions", e);
|
||||
} finally {
|
||||
// update cooldown to the time when we actually finished processing the actions
|
||||
cooldownStart.set(cloudManager.getTimeSource().getTimeNs());
|
||||
hasPendingActions.set(false);
|
||||
// resume triggers after cool down period
|
||||
|
@ -348,6 +352,7 @@ public class ScheduledTriggers implements Closeable {
|
|||
}
|
||||
return true;
|
||||
} else {
|
||||
log.debug("Ignoring event {}, already processing other actions.", event.id);
|
||||
// there is an action in the queue and we don't want to enqueue another until it is complete
|
||||
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Already processing another event.");
|
||||
return false;
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.junit.After;
|
||||
|
@ -78,7 +79,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
|||
long waitForSeconds = 1 + random().nextInt(5);
|
||||
Map<String, Object> props = createTriggerProps(waitForSeconds);
|
||||
|
||||
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger")) {
|
||||
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger1")) {
|
||||
final SolrCloudManager cloudManager = container.getZkController().getSolrCloudManager();
|
||||
trigger.configure(container.getResourceLoader(), cloudManager, props);
|
||||
trigger.init();
|
||||
|
@ -122,9 +123,13 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
|||
assertTrue(nodeNames.contains(newNode2.getNodeName()));
|
||||
}
|
||||
|
||||
// clean nodeAdded markers - normally done by OverseerTriggerThread
|
||||
container.getZkController().getSolrCloudManager().getDistribStateManager()
|
||||
.removeRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, true, false);
|
||||
|
||||
// add a new node but remove it before the waitFor period expires
|
||||
// and assert that the trigger doesn't fire at all
|
||||
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger")) {
|
||||
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger2")) {
|
||||
final SolrCloudManager cloudManager = container.getZkController().getSolrCloudManager();
|
||||
trigger.configure(container.getResourceLoader(), cloudManager, props);
|
||||
trigger.init();
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
|
@ -78,7 +79,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
|||
long waitForSeconds = 1 + random().nextInt(5);
|
||||
Map<String, Object> props = createTriggerProps(waitForSeconds);
|
||||
|
||||
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger")) {
|
||||
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger1")) {
|
||||
final SolrCloudManager cloudManager = container.getZkController().getSolrCloudManager();
|
||||
trigger.configure(container.getResourceLoader(), cloudManager, props);
|
||||
trigger.init();
|
||||
|
@ -125,9 +126,13 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
|||
|
||||
}
|
||||
|
||||
// clean nodeLost markers - normally done by OverseerTriggerThread
|
||||
container.getZkController().getSolrCloudManager().getDistribStateManager()
|
||||
.removeRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, true, false);
|
||||
|
||||
// remove a node but add it back before the waitFor period expires
|
||||
// and assert that the trigger doesn't fire at all
|
||||
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger")) {
|
||||
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger2")) {
|
||||
final SolrCloudManager cloudManager = container.getZkController().getSolrCloudManager();
|
||||
trigger.configure(container.getResourceLoader(), cloudManager, props);
|
||||
final long waitTime = 2;
|
||||
|
@ -140,8 +145,10 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
|||
String lostNodeName = lostNode.getNodeName();
|
||||
lostNode.stop();
|
||||
AtomicBoolean fired = new AtomicBoolean(false);
|
||||
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
|
||||
trigger.setProcessor(event -> {
|
||||
if (fired.compareAndSet(false, true)) {
|
||||
eventRef.set(event);
|
||||
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
|
||||
long eventTimeNanos = event.getEventTime();
|
||||
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||
|
@ -175,7 +182,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
|||
} while (true);
|
||||
|
||||
// ensure the event was not fired
|
||||
assertFalse(fired.get());
|
||||
assertFalse("event was fired: " + eventRef.get(), fired.get());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -100,18 +100,19 @@ public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
|
|||
break;
|
||||
}
|
||||
}
|
||||
// add a node
|
||||
// add a nodes
|
||||
JettySolrRunner node = cluster.startJettySolrRunner();
|
||||
cluster.waitForAllNodes(30);
|
||||
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||
fail("onChange listener didn't execute on cluster change");
|
||||
}
|
||||
assertEquals(1, listener.addedNodes.size());
|
||||
assertEquals(node.getNodeName(), listener.addedNodes.iterator().next());
|
||||
assertTrue(listener.addedNodes.toString(), listener.addedNodes.contains(node.getNodeName()));
|
||||
// verify that a znode doesn't exist (no trigger)
|
||||
String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
|
||||
assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers", zkClient().exists(pathAdded, true));
|
||||
listener.reset();
|
||||
|
||||
// stop overseer
|
||||
log.info("====== KILL OVERSEER 1");
|
||||
JettySolrRunner j = cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||
|
@ -145,8 +146,7 @@ public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
|
|||
// okay
|
||||
}
|
||||
|
||||
// verify that a znode does NOT exist - there's no nodeLost trigger,
|
||||
// so the new overseer cleaned up existing nodeLost markers
|
||||
// verify that a znode does NOT exist - the new overseer cleaned up existing nodeLost markers
|
||||
assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
|
||||
|
||||
listener.reset();
|
||||
|
@ -218,9 +218,29 @@ public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
|
|||
listenerEventLatch.countDown(); // let the trigger thread continue
|
||||
|
||||
assertTrue(triggerFiredLatch.await(10, TimeUnit.SECONDS));
|
||||
Thread.sleep(5000);
|
||||
// nodeAdded marker should be consumed now by nodeAdded trigger
|
||||
assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
|
||||
|
||||
// kill this node
|
||||
listener.reset();
|
||||
events.clear();
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
|
||||
String node1Name = node1.getNodeName();
|
||||
cluster.stopJettySolrRunner(node1);
|
||||
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||
fail("onChange listener didn't execute on cluster change");
|
||||
}
|
||||
assertEquals(1, listener.lostNodes.size());
|
||||
assertEquals(node1Name, listener.lostNodes.iterator().next());
|
||||
// verify that a znode exists
|
||||
String pathLost2 = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + node1Name;
|
||||
assertTrue("Path " + pathLost2 + " wasn't created", zkClient().exists(pathLost2, true));
|
||||
|
||||
listenerEventLatch.countDown(); // let the trigger thread continue
|
||||
|
||||
assertTrue(triggerFiredLatch.await(10, TimeUnit.SECONDS));
|
||||
|
||||
// triggers don't remove markers
|
||||
assertTrue("Path " + pathLost2 + " should still exist", zkClient().exists(pathLost2, true));
|
||||
|
||||
listener.reset();
|
||||
events.clear();
|
||||
|
|
|
@ -28,9 +28,11 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.CloudTestUtils;
|
||||
import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest;
|
||||
|
@ -38,6 +40,7 @@ import org.apache.solr.cloud.SolrCloudTestCase;
|
|||
import org.apache.solr.cloud.autoscaling.sim.SimCloudManager;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
|
@ -83,10 +86,45 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
@Before
|
||||
public void initTest() {
|
||||
public void initTest() throws Exception {
|
||||
// disable .scheduled_maintenance
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||
"}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
String setPropertiesCommand = "{" +
|
||||
"'set-properties' : {" +
|
||||
"'" + AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS + "': 1" +
|
||||
"}" +
|
||||
"}";
|
||||
response = solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
triggerFired = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
private String addNode() throws Exception {
|
||||
if (cloudManager instanceof SimCloudManager) {
|
||||
return ((SimCloudManager) cloudManager).simAddNode();
|
||||
} else {
|
||||
return cluster.startJettySolrRunner().getNodeName();
|
||||
}
|
||||
}
|
||||
|
||||
private void stopNode(String nodeName) throws Exception {
|
||||
if (cloudManager instanceof SimCloudManager) {
|
||||
((SimCloudManager) cloudManager).simRemoveNode(nodeName, true);
|
||||
} else {
|
||||
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
|
||||
if (jetty.getNodeName().equals(nodeName)) {
|
||||
cluster.stopJettySolrRunner(jetty);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void restoreDefaults() throws Exception {
|
||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST,
|
||||
|
@ -118,9 +156,10 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
|
|||
log.info(autoScalingConfig.toString());
|
||||
AutoScalingConfig.TriggerConfig triggerConfig = autoScalingConfig.getTriggerConfigs().get(AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME);
|
||||
assertNotNull(triggerConfig);
|
||||
assertEquals(2, triggerConfig.actions.size());
|
||||
assertEquals(3, triggerConfig.actions.size());
|
||||
assertTrue(triggerConfig.actions.get(0).actionClass.endsWith(InactiveShardPlanAction.class.getSimpleName()));
|
||||
assertTrue(triggerConfig.actions.get(1).actionClass.endsWith(ExecutePlanAction.class.getSimpleName()));
|
||||
assertTrue(triggerConfig.actions.get(1).actionClass.endsWith(InactiveMarkersPlanAction.class.getSimpleName()));
|
||||
assertTrue(triggerConfig.actions.get(2).actionClass.endsWith(ExecutePlanAction.class.getSimpleName()));
|
||||
AutoScalingConfig.TriggerListenerConfig listenerConfig = autoScalingConfig.getTriggerListenerConfigs().get(AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME + ".system");
|
||||
assertNotNull(listenerConfig);
|
||||
assertEquals(AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME, listenerConfig.trigger);
|
||||
|
@ -153,7 +192,7 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
|
|||
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||
if (context.getProperties().containsKey("inactive_shard_plan")) {
|
||||
if (context.getProperties().containsKey("inactive_shard_plan") || context.getProperties().containsKey("inactive_markers_plan")) {
|
||||
triggerFired.countDown();
|
||||
}
|
||||
}
|
||||
|
@ -201,7 +240,7 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
|
|||
"'name' : '" + AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME + "'," +
|
||||
"'event' : 'scheduled'," +
|
||||
"'startTime' : 'NOW+10SECONDS'," +
|
||||
"'every' : '+2SECONDS'," +
|
||||
"'every' : '+2SECONDS'," + // must be longer than the cooldown period
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name' : 'inactive_shard_plan', 'class' : 'solr.InactiveShardPlanAction', 'ttl' : '20'}," +
|
||||
"{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}," +
|
||||
|
@ -272,4 +311,69 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
|
|||
|
||||
CloudTestUtils.clusterShape(2, 1).matches(state.getLiveNodes(), state.getCollection(collection1));
|
||||
}
|
||||
|
||||
public static CountDownLatch getTriggerFired() {
|
||||
return triggerFired;
|
||||
}
|
||||
|
||||
public static class TestTriggerAction2 extends TriggerActionBase {
|
||||
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||
getTriggerFired().countDown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testInactiveMarkersCleanup() throws Exception {
|
||||
triggerFired = new CountDownLatch(1);
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'trigger1'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor': '1s'" +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [" +
|
||||
"{'name' : 'test', 'class' : '" + TestTriggerAction2.class.getName() + "'}]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : '" + AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME + "'," +
|
||||
"'event' : 'scheduled'," +
|
||||
"'startTime' : 'NOW+20SECONDS'," +
|
||||
"'every' : '+2SECONDS'," + // must be longer than the cooldown period!!
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name' : 'inactive_markers_plan', 'class' : 'solr.InactiveMarkersPlanAction', 'ttl' : '20'}," +
|
||||
"{'name' : 'test', 'class' : '" + TestTriggerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
cloudManager.getTimeSource().sleep(5000);
|
||||
|
||||
triggerFired = new CountDownLatch(1);
|
||||
String node = addNode();
|
||||
|
||||
boolean await = triggerFired.await(30, TimeUnit.SECONDS);
|
||||
assertTrue("trigger should have fired", await);
|
||||
|
||||
triggerFired = new CountDownLatch(1);
|
||||
|
||||
// should have a marker
|
||||
DistribStateManager stateManager = cloudManager.getDistribStateManager();
|
||||
String nodeAddedPath = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node;
|
||||
assertTrue("marker for nodeAdded doesn't exist", stateManager.hasData(nodeAddedPath));
|
||||
|
||||
// wait for the cleanup to fire
|
||||
await = triggerFired.await(90, TimeUnit.SECONDS);
|
||||
assertTrue("cleanup trigger should have fired", await);
|
||||
assertFalse("marker for nodeAdded still exists", stateManager.hasData(nodeAddedPath));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -285,10 +285,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
if (liveNodes.contains(nodeId)) {
|
||||
throw new Exception("Node " + nodeId + " already exists");
|
||||
}
|
||||
liveNodes.add(nodeId);
|
||||
createEphemeralLiveNode(nodeId);
|
||||
updateOverseerLeader();
|
||||
nodeReplicaMap.putIfAbsent(nodeId, new ArrayList<>());
|
||||
liveNodes.add(nodeId);
|
||||
updateOverseerLeader();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -310,12 +310,16 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
// remove ephemeral nodes
|
||||
stateManager.getRoot().removeEphemeralChildren(nodeId);
|
||||
updateOverseerLeader();
|
||||
// create a nodeLost marker if needed
|
||||
AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
|
||||
if (cfg.hasTriggerForEvents(TriggerEventType.NODELOST)) {
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId);
|
||||
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId;
|
||||
byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", cloudManager.getTimeSource().getEpochTimeNs()));
|
||||
stateManager.makePath(path,
|
||||
json, CreateMode.PERSISTENT, false);
|
||||
log.debug(" -- created marker: {}", path);
|
||||
}
|
||||
updateOverseerLeader();
|
||||
if (!collections.isEmpty()) {
|
||||
simRunLeaderElection(collections, true);
|
||||
}
|
||||
|
@ -388,7 +392,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
mgr.makePath(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
|
||||
AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
|
||||
if (cfg.hasTriggerForEvents(TriggerEventType.NODEADDED)) {
|
||||
mgr.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
|
||||
byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", cloudManager.getTimeSource().getEpochTimeNs()));
|
||||
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId;
|
||||
log.debug("-- creating marker: {}", path);
|
||||
mgr.makePath(path, json, CreateMode.EPHEMERAL, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue