mirror of https://github.com/apache/lucene.git
SOLR-13376: Multi-node race condition to create/remove nodeLost markers.
This commit is contained in:
parent
e3bd5a7da2
commit
8c144444fe
|
@ -265,6 +265,8 @@ Bug Fixes
|
||||||
|
|
||||||
* SOLR-13790: LRUStatsCache size explosion and ineffective caching. (ab)
|
* SOLR-13790: LRUStatsCache size explosion and ineffective caching. (ab)
|
||||||
|
|
||||||
|
* SOLR-13376: Multi-node race condition to create/remove nodeLost markers. (hoss, ab)
|
||||||
|
|
||||||
Other Changes
|
Other Changes
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,9 @@ import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_ACTIVE;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_STATE;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This plan simply removes nodeAdded and nodeLost markers from Zookeeper if their TTL has
|
* This plan simply removes nodeAdded and nodeLost markers from Zookeeper if their TTL has
|
||||||
* expired. These markers are used by {@link NodeAddedTrigger} and {@link NodeLostTrigger} to
|
* expired. These markers are used by {@link NodeAddedTrigger} and {@link NodeLostTrigger} to
|
||||||
|
@ -105,12 +108,14 @@ public class InactiveMarkersPlanAction extends TriggerActionBase {
|
||||||
log.trace(" -- ignore {}: either missing or unsupported format", markerPath);
|
log.trace(" -- ignore {}: either missing or unsupported format", markerPath);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
boolean activeMarker = payload.getOrDefault(MARKER_STATE, MARKER_ACTIVE)
|
||||||
|
.equals(MARKER_ACTIVE);
|
||||||
long timestamp = ((Number)payload.get("timestamp")).longValue();
|
long timestamp = ((Number)payload.get("timestamp")).longValue();
|
||||||
long delta = TimeUnit.NANOSECONDS.toSeconds(currentTimeNs - timestamp);
|
long delta = TimeUnit.NANOSECONDS.toSeconds(currentTimeNs - timestamp);
|
||||||
if (delta > cleanupTTL) {
|
if (delta > cleanupTTL || !activeMarker) {
|
||||||
try {
|
try {
|
||||||
stateManager.removeData(markerPath, -1);
|
stateManager.removeData(markerPath, -1);
|
||||||
log.trace(" -- remove {}, delta={}, ttl={}", markerPath, delta, cleanupTTL);
|
log.trace(" -- remove {}, delta={}, ttl={}, active={}", markerPath, delta, cleanupTTL, activeMarker);
|
||||||
cleanedUp.add(m);
|
cleanedUp.add(m);
|
||||||
} catch (NoSuchElementException nse) {
|
} catch (NoSuchElementException nse) {
|
||||||
// someone already removed it - ignore
|
// someone already removed it - ignore
|
||||||
|
@ -121,7 +126,7 @@ public class InactiveMarkersPlanAction extends TriggerActionBase {
|
||||||
log.error("Marker znode should be empty but it's not! Ignoring {} ({})", markerPath, ne.toString());
|
log.error("Marker znode should be empty but it's not! Ignoring {} ({})", markerPath, ne.toString());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.trace(" -- keep {}, delta={}, ttl={}", markerPath, delta, cleanupTTL);
|
log.trace(" -- keep {}, delta={}, ttl={}, active={}", markerPath, delta, cleanupTTL, activeMarker);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.solr.cloud.autoscaling;
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -36,10 +37,15 @@ import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.CollectionParams;
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.core.SolrResourceLoader;
|
import org.apache.solr.core.SolrResourceLoader;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_ACTIVE;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_INACTIVE;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_STATE;
|
||||||
import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
|
import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
|
||||||
import static org.apache.solr.common.params.AutoScalingParams.REPLICA_TYPE;
|
import static org.apache.solr.common.params.AutoScalingParams.REPLICA_TYPE;
|
||||||
|
|
||||||
|
@ -71,6 +77,16 @@ public class NodeAddedTrigger extends TriggerBase {
|
||||||
try {
|
try {
|
||||||
List<String> added = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
List<String> added = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||||
added.forEach(n -> {
|
added.forEach(n -> {
|
||||||
|
String markerPath = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + n;
|
||||||
|
try {
|
||||||
|
Map<String, Object> markerData = Utils.getJson(stateManager, markerPath);
|
||||||
|
// skip inactive markers
|
||||||
|
if (markerData.getOrDefault(MARKER_STATE, MARKER_ACTIVE).equals(MARKER_INACTIVE)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException | IOException | KeeperException e) {
|
||||||
|
log.debug("-- ignoring marker " + markerPath + " state due to error", e);
|
||||||
|
}
|
||||||
// don't add nodes that have since gone away
|
// don't add nodes that have since gone away
|
||||||
if (lastLiveNodes.contains(n) && !nodeNameVsTimeAdded.containsKey(n)) {
|
if (lastLiveNodes.contains(n) && !nodeNameVsTimeAdded.containsKey(n)) {
|
||||||
// since {@code #restoreState(AutoScaling.Trigger)} is called first, the timeAdded for a node may also be restored
|
// since {@code #restoreState(AutoScaling.Trigger)} is called first, the timeAdded for a node may also be restored
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.solr.cloud.autoscaling;
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -36,10 +37,15 @@ import org.apache.solr.common.AlreadyClosedException;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.CollectionParams;
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.core.SolrResourceLoader;
|
import org.apache.solr.core.SolrResourceLoader;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_ACTIVE;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_INACTIVE;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_STATE;
|
||||||
import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
|
import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -68,6 +74,16 @@ public class NodeLostTrigger extends TriggerBase {
|
||||||
try {
|
try {
|
||||||
List<String> lost = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
List<String> lost = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||||
lost.forEach(n -> {
|
lost.forEach(n -> {
|
||||||
|
String markerPath = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n;
|
||||||
|
try {
|
||||||
|
Map<String, Object> markerData = Utils.getJson(stateManager, markerPath);
|
||||||
|
// skip inactive markers
|
||||||
|
if (markerData.getOrDefault(MARKER_STATE, MARKER_ACTIVE).equals(MARKER_INACTIVE)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException | IOException | KeeperException e) {
|
||||||
|
log.debug("-- ignoring marker " + markerPath + " state due to error", e);
|
||||||
|
}
|
||||||
// don't add nodes that have since came back
|
// don't add nodes that have since came back
|
||||||
if (!lastLiveNodes.contains(n) && !nodeNameVsTimeRemoved.containsKey(n)) {
|
if (!lastLiveNodes.contains(n) && !nodeNameVsTimeRemoved.containsKey(n)) {
|
||||||
// since {@code #restoreState(AutoScaling.Trigger)} is called first, the timeRemoved for a node may also be restored
|
// since {@code #restoreState(AutoScaling.Trigger)} is called first, the timeRemoved for a node may also be restored
|
||||||
|
|
|
@ -22,12 +22,14 @@ import java.lang.invoke.MethodHandles;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
|
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
|
||||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
|
@ -55,6 +57,11 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
public static final String MARKER_STATE = "state";
|
||||||
|
public static final String MARKER_ACTIVE = "active";
|
||||||
|
public static final String MARKER_INACTIVE = "inactive";
|
||||||
|
|
||||||
|
|
||||||
private final SolrCloudManager cloudManager;
|
private final SolrCloudManager cloudManager;
|
||||||
|
|
||||||
private final CloudConfig cloudConfig;
|
private final CloudConfig cloudConfig;
|
||||||
|
@ -252,20 +259,31 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
|
||||||
throw new IllegalStateException("Caught AlreadyClosedException from ScheduledTriggers, but we're not closed yet!", e);
|
throw new IllegalStateException("Caught AlreadyClosedException from ScheduledTriggers, but we're not closed yet!", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.debug("-- cleaning old nodeLost / nodeAdded markers");
|
log.debug("-- deactivating old nodeLost / nodeAdded markers");
|
||||||
removeMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
deactivateMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||||
removeMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
deactivateMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||||
processedZnodeVersion = znodeVersion;
|
processedZnodeVersion = znodeVersion;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeMarkers(String path) {
|
private void deactivateMarkers(String path) {
|
||||||
|
DistribStateManager stateManager = cloudManager.getDistribStateManager();
|
||||||
try {
|
try {
|
||||||
cloudManager.getDistribStateManager().removeRecursively(path, true, false);
|
List<String> markers = stateManager.listData(path);
|
||||||
|
for (String marker : markers) {
|
||||||
|
String markerPath = path + "/" + marker;
|
||||||
|
try {
|
||||||
|
Map<String, Object> markerMap = new HashMap<>(Utils.getJson(stateManager, markerPath));
|
||||||
|
markerMap.put(MARKER_STATE, MARKER_INACTIVE);
|
||||||
|
stateManager.setData(markerPath, Utils.toJSON(markerMap), -1);
|
||||||
|
} catch (NoSuchElementException e) {
|
||||||
|
// ignore - already deleted
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (NoSuchElementException e) {
|
} catch (NoSuchElementException e) {
|
||||||
// ignore
|
// ignore
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("Error removing old markers", e);
|
log.warn("Error deactivating old markers", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,12 +20,14 @@ package org.apache.solr.cloud.autoscaling;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
@ -41,6 +43,7 @@ import org.apache.solr.common.cloud.LiveNodesListener;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.common.util.TimeSource;
|
import org.apache.solr.common.util.TimeSource;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.util.LogLevel;
|
import org.apache.solr.util.LogLevel;
|
||||||
import org.apache.solr.util.TimeOut;
|
import org.apache.solr.util.TimeOut;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -50,6 +53,10 @@ import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_ACTIVE;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_INACTIVE;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_STATE;
|
||||||
|
|
||||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
|
public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
@ -81,7 +88,7 @@ public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
|
||||||
return triggerFiredLatch;
|
return triggerFiredLatch;
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13376")
|
//@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13376")
|
||||||
@Test
|
@Test
|
||||||
public void testNodeMarkersRegistration() throws Exception {
|
public void testNodeMarkersRegistration() throws Exception {
|
||||||
triggerFiredLatch = new CountDownLatch(1);
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
@ -135,10 +142,16 @@ public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
|
||||||
String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
|
String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
|
||||||
|
|
||||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||||
|
AtomicBoolean markerInactive = new AtomicBoolean();
|
||||||
try {
|
try {
|
||||||
timeout.waitFor("zk path to go away", () -> {
|
timeout.waitFor("nodeLost marker to get inactive", () -> {
|
||||||
try {
|
try {
|
||||||
return !zkClient().exists(pathLost, true);
|
if (!zkClient().exists(pathLost, true)) {
|
||||||
|
throw new RuntimeException("marker " + pathLost + " should exist!");
|
||||||
|
}
|
||||||
|
Map<String, Object> markerData = Utils.getJson(zkClient(), pathLost, true);
|
||||||
|
markerInactive.set(markerData.getOrDefault(MARKER_STATE, MARKER_ACTIVE).equals(MARKER_INACTIVE));
|
||||||
|
return markerInactive.get();
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -149,8 +162,8 @@ public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
|
||||||
// okay
|
// okay
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify that a znode does NOT exist - the new overseer cleaned up existing nodeLost markers
|
// verify that the marker is inactive - the new overseer should deactivate markers once they are processed
|
||||||
assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
|
assertTrue("Marker " + pathLost + " still active!", markerInactive.get());
|
||||||
|
|
||||||
listener.reset();
|
listener.reset();
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
import org.apache.solr.cloud.CloudTestUtils;
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
import org.apache.solr.cloud.CloudUtil;
|
import org.apache.solr.cloud.CloudUtil;
|
||||||
|
@ -62,6 +63,7 @@ import org.apache.solr.common.MapWriter;
|
||||||
import org.apache.solr.common.cloud.LiveNodesListener;
|
import org.apache.solr.common.cloud.LiveNodesListener;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.util.TimeSource;
|
import org.apache.solr.common.util.TimeSource;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.core.SolrResourceLoader;
|
import org.apache.solr.core.SolrResourceLoader;
|
||||||
import org.apache.solr.util.LogLevel;
|
import org.apache.solr.util.LogLevel;
|
||||||
import org.apache.solr.util.TimeOut;
|
import org.apache.solr.util.TimeOut;
|
||||||
|
@ -74,6 +76,10 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.AtomicDouble;
|
import com.google.common.util.concurrent.AtomicDouble;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_ACTIVE;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_INACTIVE;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_STATE;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An end-to-end integration test for triggers
|
* An end-to-end integration test for triggers
|
||||||
*/
|
*/
|
||||||
|
@ -864,10 +870,6 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
|
|
||||||
public static class TestEventMarkerAction extends TriggerActionBase {
|
public static class TestEventMarkerAction extends TriggerActionBase {
|
||||||
|
|
||||||
public TestEventMarkerAction() {
|
|
||||||
actionConstructorCalled.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(TriggerEvent event, ActionContext actionContext) {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
boolean locked = lock.tryLock();
|
boolean locked = lock.tryLock();
|
||||||
|
@ -887,19 +889,29 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> args) throws TriggerValidationException {
|
public void init() throws Exception {
|
||||||
log.info("TestEventMarkerAction init");
|
log.info("TestEventMarkerAction init");
|
||||||
actionInitCalled.countDown();
|
super.init();
|
||||||
super.configure(loader, cloudManager, args);
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class AssertingListener extends TriggerListenerBase {
|
||||||
|
@Override
|
||||||
|
public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
|
||||||
|
if (!Thread.currentThread().getName().startsWith("ScheduledTrigger")) {
|
||||||
|
// for future safety
|
||||||
|
throw new IllegalThreadStateException("AssertingListener should have been invoked by a thread from the scheduled trigger thread pool");
|
||||||
|
}
|
||||||
|
log.debug(" --- listener fired for event: {}, stage: {}", event, stage);
|
||||||
|
listenerEventLatch.await();
|
||||||
|
log.debug(" --- listener wait complete for event: {}, stage: {}", event, stage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeMarkersRegistration() throws Exception {
|
public void testNodeMarkersRegistration() throws Exception {
|
||||||
// for this test we want to create two triggers so we must assert that the actions were created twice
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
actionInitCalled = new CountDownLatch(2);
|
listenerEventLatch = new CountDownLatch(1);
|
||||||
// similarly we want both triggers to fire
|
|
||||||
triggerFiredLatch = new CountDownLatch(2);
|
|
||||||
TestLiveNodesListener listener = registerLiveNodesListener();
|
TestLiveNodesListener listener = registerLiveNodesListener();
|
||||||
|
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
|
@ -912,7 +924,7 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
assertTrue("cluster onChange listener didn't execute even after await()ing an excessive amount of time",
|
assertTrue("cluster onChange listener didn't execute even after await()ing an excessive amount of time",
|
||||||
listener.onChangeLatch.await(60, TimeUnit.SECONDS));
|
listener.onChangeLatch.await(60, TimeUnit.SECONDS));
|
||||||
assertEquals(1, listener.addedNodes.size());
|
assertEquals(1, listener.addedNodes.size());
|
||||||
assertEquals(node, listener.addedNodes.iterator().next());
|
assertTrue(listener.addedNodes.toString(), listener.addedNodes.contains(node));
|
||||||
// verify that a znode doesn't exist (no trigger)
|
// verify that a znode doesn't exist (no trigger)
|
||||||
String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node;
|
String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node;
|
||||||
assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers",
|
assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers",
|
||||||
|
@ -931,22 +943,28 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
assertEquals(0, listener.addedNodes.size());
|
assertEquals(0, listener.addedNodes.size());
|
||||||
// wait until the new overseer is up
|
// wait until the new overseer is up
|
||||||
cluster.getTimeSource().sleep(5000);
|
cluster.getTimeSource().sleep(5000);
|
||||||
// verify that a znode does NOT exist - there's no nodeLost trigger,
|
|
||||||
// so the new overseer cleaned up existing nodeLost markers
|
|
||||||
|
|
||||||
String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
|
String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
|
||||||
|
|
||||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||||
timeout.waitFor("Path " + pathLost + " exists", () -> {
|
AtomicBoolean markerInactive = new AtomicBoolean();
|
||||||
|
timeout.waitFor("nodeLost marker to get inactive", () -> {
|
||||||
try {
|
try {
|
||||||
return !cluster.getDistribStateManager().hasData(pathLost);
|
if (!cluster.getDistribStateManager().hasData(pathLost)) {
|
||||||
|
throw new RuntimeException("marker " + pathLost + " should exist!");
|
||||||
|
}
|
||||||
|
Map<String, Object> markerData = Utils.getJson(cluster.getDistribStateManager(), pathLost);
|
||||||
|
markerInactive.set(markerData.getOrDefault(MARKER_STATE, MARKER_ACTIVE).equals(MARKER_INACTIVE));
|
||||||
|
return markerInactive.get();
|
||||||
|
|
||||||
} catch (IOException | KeeperException | InterruptedException e) {
|
} catch (IOException | KeeperException | InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
assertFalse("Path " + pathLost + " exists", cluster.getDistribStateManager().hasData(pathLost));
|
// verify that the marker is inactive - the new overseer should deactivate markers once they are processed
|
||||||
|
assertTrue("Marker " + pathLost + " still active!", markerInactive.get());
|
||||||
|
|
||||||
listener.reset();
|
listener.reset();
|
||||||
|
|
||||||
|
@ -956,7 +974,7 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
assertAutoScalingRequest
|
assertAutoScalingRequest
|
||||||
("{" +
|
("{" +
|
||||||
"'set-trigger' : {" +
|
"'set-trigger' : {" +
|
||||||
"'name' : 'node_added_trigger'," +
|
"'name' : 'node_added_triggerMR'," +
|
||||||
"'event' : 'nodeAdded'," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'waitFor' : '1s'," +
|
"'waitFor' : '1s'," +
|
||||||
"'enabled' : true," +
|
"'enabled' : true," +
|
||||||
|
@ -966,14 +984,25 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
assertAutoScalingRequest
|
assertAutoScalingRequest
|
||||||
("{" +
|
("{" +
|
||||||
"'set-trigger' : {" +
|
"'set-trigger' : {" +
|
||||||
"'name' : 'node_lost_trigger'," +
|
"'name' : 'node_lost_triggerMR'," +
|
||||||
"'event' : 'nodeLost'," +
|
"'event' : 'nodeLost'," +
|
||||||
"'waitFor' : '1s'," +
|
"'waitFor' : '1s'," +
|
||||||
"'enabled' : true," +
|
"'enabled' : true," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
|
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
|
||||||
"}}");
|
"}}");
|
||||||
|
|
||||||
|
assertAutoScalingRequest(
|
||||||
|
"{\n" +
|
||||||
|
" \"set-listener\" : {\n" +
|
||||||
|
" \"name\" : \"listener_node_added_triggerMR\",\n" +
|
||||||
|
" \"trigger\" : \"node_added_triggerMR\",\n" +
|
||||||
|
" \"stage\" : \"STARTED\",\n" +
|
||||||
|
" \"class\" : \"" + AssertingListener.class.getName() + "\"\n" +
|
||||||
|
" }\n" +
|
||||||
|
"}"
|
||||||
|
);
|
||||||
assertAutoscalingUpdateComplete();
|
assertAutoscalingUpdateComplete();
|
||||||
|
|
||||||
overseerLeader = cluster.getSimClusterStateProvider().simGetOverseerLeader();
|
overseerLeader = cluster.getSimClusterStateProvider().simGetOverseerLeader();
|
||||||
|
|
||||||
// create another node
|
// create another node
|
||||||
|
@ -987,41 +1016,51 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1;
|
pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1;
|
||||||
assertTrue("Path " + pathAdded + " wasn't created", cluster.getDistribStateManager().hasData(pathAdded));
|
assertTrue("Path " + pathAdded + " wasn't created", cluster.getDistribStateManager().hasData(pathAdded));
|
||||||
|
|
||||||
|
listenerEventLatch.countDown(); // let the trigger thread continue
|
||||||
|
|
||||||
|
assertTrue(triggerFiredLatch.await(10, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// kill this node
|
||||||
listener.reset();
|
listener.reset();
|
||||||
events.clear();
|
events.clear();
|
||||||
// one nodeAdded (not cleared yet) and one nodeLost
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
triggerFiredLatch = new CountDownLatch(2);
|
|
||||||
|
cluster.simRemoveNode(node1, true);
|
||||||
|
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||||
|
fail("onChange listener didn't execute on cluster change");
|
||||||
|
}
|
||||||
|
assertEquals(1, listener.lostNodes.size());
|
||||||
|
assertEquals(node1, listener.lostNodes.iterator().next());
|
||||||
|
// verify that a znode exists
|
||||||
|
String pathLost2 = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + node1;
|
||||||
|
assertTrue("Path " + pathLost2 + " wasn't created", cluster.getDistribStateManager().hasData(pathLost2));
|
||||||
|
|
||||||
|
listenerEventLatch.countDown(); // let the trigger thread continue
|
||||||
|
|
||||||
|
assertTrue(triggerFiredLatch.await(10, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// triggers don't remove markers
|
||||||
|
assertTrue("Path " + pathLost2 + " should still exist", cluster.getDistribStateManager().hasData(pathLost2));
|
||||||
|
|
||||||
|
listener.reset();
|
||||||
|
events.clear();
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
// kill overseer again
|
// kill overseer again
|
||||||
log.info("====== KILL OVERSEER 2");
|
log.info("====== KILL OVERSEER 2");
|
||||||
cluster.simRestartOverseer(overseerLeader);
|
cluster.simRemoveNode(overseerLeader, true);
|
||||||
assertTrue("cluster onChange listener didn't execute even after await()ing an excessive amount of time",
|
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||||
listener.onChangeLatch.await(60, TimeUnit.SECONDS));
|
fail("onChange listener didn't execute on cluster change");
|
||||||
|
|
||||||
assertAutoscalingUpdateComplete();
|
|
||||||
|
|
||||||
assertTrue("trigger did not fire event after await()ing an excessive amount of time",
|
|
||||||
triggerFiredLatch.await(60, TimeUnit.SECONDS));
|
|
||||||
assertEquals(2, events.size());
|
|
||||||
TriggerEvent nodeAdded = null;
|
|
||||||
TriggerEvent nodeLost = null;
|
|
||||||
for (TriggerEvent ev : events) {
|
|
||||||
switch (ev.getEventType()) {
|
|
||||||
case NODEADDED:
|
|
||||||
nodeAdded = ev;
|
|
||||||
break;
|
|
||||||
case NODELOST:
|
|
||||||
nodeLost = ev;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
fail("unexpected event type: " + ev);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
assertNotNull("expected nodeAdded event", nodeAdded);
|
|
||||||
assertNotNull("expected nodeLost event", nodeLost);
|
|
||||||
List<String> nodeNames = (List<String>)nodeLost.getProperty(TriggerEvent.NODE_NAMES);
|
if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
|
||||||
|
fail("Trigger should have fired by now");
|
||||||
|
}
|
||||||
|
assertEquals(1, events.size());
|
||||||
|
TriggerEvent ev = events.iterator().next();
|
||||||
|
List<String> nodeNames = (List<String>) ev.getProperty(TriggerEvent.NODE_NAMES);
|
||||||
assertTrue(nodeNames.contains(overseerLeader));
|
assertTrue(nodeNames.contains(overseerLeader));
|
||||||
nodeNames = (List<String>)nodeAdded.getProperty(TriggerEvent.NODE_NAMES);
|
assertEquals(TriggerEventType.NODELOST, ev.getEventType());
|
||||||
assertTrue(nodeNames.contains(node1));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static final Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
|
static final Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
|
||||||
|
|
|
@ -522,6 +522,7 @@ request node deletion.
|
||||||
}
|
}
|
||||||
----
|
----
|
||||||
|
|
||||||
|
[[scheduledtrigger]]
|
||||||
=== Scheduled Trigger
|
=== Scheduled Trigger
|
||||||
|
|
||||||
The Scheduled trigger generates events according to a fixed rate schedule.
|
The Scheduled trigger generates events according to a fixed rate schedule.
|
||||||
|
@ -563,3 +564,46 @@ ever executing if a new scheduled event is ready as soon as the cooldown period
|
||||||
Solr randomizes the order in which the triggers are resumed after the cooldown period to mitigate this problem. However, it is recommended that scheduled triggers
|
Solr randomizes the order in which the triggers are resumed after the cooldown period to mitigate this problem. However, it is recommended that scheduled triggers
|
||||||
are not used with low `every` values and an external scheduling process such as cron be used for such cases instead.
|
are not used with low `every` values and an external scheduling process such as cron be used for such cases instead.
|
||||||
====
|
====
|
||||||
|
|
||||||
|
== Default Triggers
|
||||||
|
A fresh installation of SolrCloud always creates some default triggers. If these triggers are missing (eg. they were
|
||||||
|
deleted) they are re-created on any autoscaling configuration change or Overseer restart. These triggers can be
|
||||||
|
suspended if their functionality somehow interferes with other configuration but they can't be permanently deleted.
|
||||||
|
|
||||||
|
=== Auto-add Replicas Trigger
|
||||||
|
The default configuration and functionality of this trigger is described in detail in the
|
||||||
|
section titled <<solrcloud-autoscaling-auto-add-replicas.adoc#solrcloud-autoscaling-auto-add-replicas,Automatically Adding Replicas>>.
|
||||||
|
|
||||||
|
=== Scheduled Maintenance Trigger
|
||||||
|
This is a <<scheduledtrigger>> named `.scheduled_maintenance` and it's configured to run once per day.
|
||||||
|
It executes the following actions:
|
||||||
|
|
||||||
|
==== `solr.InactiveShardPlanAction`
|
||||||
|
This action checks existing collections for any shards in `INACTIVE` state, which indicates that they
|
||||||
|
are the original parent shards remaining after a successful `SPLITSHARD` operation.
|
||||||
|
|
||||||
|
These shards are not immediately deleted because shard splitting is a complex operation that may fail in
|
||||||
|
non-obvious ways, so keeping the original parent shard gives users a chance to recover from potential failures.
|
||||||
|
|
||||||
|
However, keeping these shards indefinitely doesn't make sense either because they still use system
|
||||||
|
resources (their Solr cores are still being loaded, and their indexes still occupy disk space).
|
||||||
|
This scheduled action is responsible for removing such inactive parent shards after their
|
||||||
|
time-to-live expires. By default the TTL is set to 48 hours after the shard state was set to
|
||||||
|
`INACTIVE`. When this TTL elapses this scheduled action requests that the shard be deleted, which is then
|
||||||
|
executed by `solr.ExecutePlanAction` that is configured for this trigger.
|
||||||
|
|
||||||
|
==== `solr.InactiveMarkersPlanAction`
|
||||||
|
When a node is lost or added an event is generated - but if the lost node was the one running
|
||||||
|
Overseer leader such event may not be properly processed by the triggers (which run in the Overseer leader context).
|
||||||
|
For this reason a special marker is created in ZooKeeper so that when the next Overseer leader is elected the
|
||||||
|
triggers will be able to learn about and process these past events.
|
||||||
|
|
||||||
|
Triggers don't delete these markers once they are done processing (because several triggers may need them and eg.
|
||||||
|
scheduled triggers may run at arbitrary times with arbitrary delays) so Solr needs a mechanism to clean up
|
||||||
|
old markers for such events so that they don't accumulate over time. This trigger action performs the clean-up
|
||||||
|
- it deletes markers older than the configured time-to-live (by default it's 48 hours).
|
||||||
|
|
||||||
|
=== `solr.ExecutePlanAction`
|
||||||
|
This action simply executes any collection admin requests generated by other
|
||||||
|
actions - in particular, in the default configuration it executes `DELETESHARD` requests produced by
|
||||||
|
`solr.InactiveShardPlanAction`, as described above.
|
Loading…
Reference in New Issue