mirror of
https://github.com/apache/lucene.git
synced 2025-02-07 18:49:03 +00:00
SOLR-10396: Implement trigger support for nodeLost event type
This commit is contained in:
parent
69c0765eb5
commit
81e0f801f5
@ -73,6 +73,8 @@ New Features
|
||||
|
||||
* SOLR-10376: Implement autoscaling trigger for nodeAdded event. (shalin)
|
||||
|
||||
* SOLR-10396: Implement trigger support for nodeLost event type (Cao Manh Dat, shalin)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
* SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509.
|
||||
|
@ -120,6 +120,8 @@ public class AutoScaling {
|
||||
switch (type) {
|
||||
case NODEADDED:
|
||||
return new NodeAddedTrigger(name, props, coreContainer);
|
||||
case NODELOST:
|
||||
return new NodeLostTrigger(name, props, coreContainer);
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
|
||||
}
|
||||
|
@ -0,0 +1,238 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Trigger for the {@link AutoScaling.EventType#NODELOST} event
|
||||
*/
|
||||
public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.NodeLostEvent> {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final String name;
|
||||
private final Map<String, Object> properties;
|
||||
private final CoreContainer container;
|
||||
private final List<TriggerAction> actions;
|
||||
private final AtomicReference<AutoScaling.TriggerListener<NodeLostEvent>> listenerRef;
|
||||
|
||||
private boolean isClosed = false;
|
||||
|
||||
private Set<String> lastLiveNodes;
|
||||
|
||||
private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
|
||||
|
||||
public NodeLostTrigger(String name, Map<String, Object> properties,
|
||||
CoreContainer container) {
|
||||
this.name = name;
|
||||
this.properties = properties;
|
||||
this.container = container;
|
||||
this.listenerRef = new AtomicReference<>();
|
||||
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
|
||||
if (o != null && !o.isEmpty()) {
|
||||
actions = new ArrayList<>(3);
|
||||
for (Map<String, String> map : o) {
|
||||
TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class);
|
||||
action.init(map);
|
||||
actions.add(action);
|
||||
}
|
||||
} else {
|
||||
actions = Collections.emptyList();
|
||||
}
|
||||
lastLiveNodes = container.getZkController().getZkStateReader().getClusterState().getLiveNodes();
|
||||
log.info("Initial livenodes: " + lastLiveNodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setListener(AutoScaling.TriggerListener<NodeLostEvent> listener) {
|
||||
listenerRef.set(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoScaling.TriggerListener<NodeLostEvent> getListener() {
|
||||
return listenerRef.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoScaling.EventType getEventType() {
|
||||
return AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return Boolean.parseBoolean((String) properties.getOrDefault("enabled", "true"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getWaitForSecond() {
|
||||
return ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TriggerAction> getActions() {
|
||||
return actions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof NodeLostTrigger) {
|
||||
NodeLostTrigger that = (NodeLostTrigger) obj;
|
||||
return this.name.equals(that.name)
|
||||
&& this.properties.equals(that.properties);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(name, properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
synchronized (this) {
|
||||
isClosed = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
synchronized (this) {
|
||||
if (isClosed) {
|
||||
log.warn("NodeLostTrigger ran but was already closed");
|
||||
throw new RuntimeException("Trigger has been closed");
|
||||
}
|
||||
}
|
||||
log.debug("Running NodeLostTrigger");
|
||||
|
||||
ZkStateReader reader = container.getZkController().getZkStateReader();
|
||||
Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
|
||||
log.info("Found livenodes: " + newLiveNodes);
|
||||
|
||||
// have any nodes that we were tracking been added to the cluster?
|
||||
// if so, remove them from the tracking map
|
||||
Set<String> trackingKeySet = nodeNameVsTimeRemoved.keySet();
|
||||
trackingKeySet.removeAll(newLiveNodes);
|
||||
|
||||
// have any nodes been removed?
|
||||
Set<String> copyOfLastLiveNodes = new HashSet<>(lastLiveNodes);
|
||||
copyOfLastLiveNodes.removeAll(newLiveNodes);
|
||||
copyOfLastLiveNodes.forEach(n -> {
|
||||
log.info("Tracking lost node: {}", n);
|
||||
nodeNameVsTimeRemoved.put(n, System.nanoTime());
|
||||
});
|
||||
|
||||
// has enough time expired to trigger events for a node?
|
||||
for (Map.Entry<String, Long> entry : nodeNameVsTimeRemoved.entrySet()) {
|
||||
String nodeName = entry.getKey();
|
||||
Long timeRemoved = entry.getValue();
|
||||
if (TimeUnit.SECONDS.convert(System.nanoTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
|
||||
// fire!
|
||||
AutoScaling.TriggerListener<NodeLostEvent> listener = listenerRef.get();
|
||||
if (listener != null) {
|
||||
log.info("NodeLostTrigger firing registered listener");
|
||||
listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName));
|
||||
}
|
||||
trackingKeySet.remove(nodeName);
|
||||
}
|
||||
}
|
||||
|
||||
lastLiveNodes = newLiveNodes;
|
||||
} catch (RuntimeException e) {
|
||||
log.error("Unexpected exception in NodeLostTrigger", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
synchronized (this) {
|
||||
return isClosed;
|
||||
}
|
||||
}
|
||||
|
||||
public static class NodeLostEvent implements AutoScaling.TriggerEvent<NodeLostTrigger> {
|
||||
private final NodeLostTrigger source;
|
||||
private final long nodeLostNanoTime;
|
||||
private final String nodeName;
|
||||
|
||||
private Map<String, Object> context;
|
||||
|
||||
public NodeLostEvent(NodeLostTrigger source, long nodeLostNanoTime, String nodeRemoved) {
|
||||
this.source = source;
|
||||
this.nodeLostNanoTime = nodeLostNanoTime;
|
||||
this.nodeName = nodeRemoved;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeLostTrigger getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getEventNanoTime() {
|
||||
return nodeLostNanoTime;
|
||||
}
|
||||
|
||||
public String getNodeName() {
|
||||
return nodeName;
|
||||
}
|
||||
|
||||
public AutoScaling.EventType getType() {
|
||||
return source.getEventType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContext(Map<String, Object> context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getContext() {
|
||||
return context;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,147 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test for {@link NodeLostTrigger}
|
||||
*/
|
||||
public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(5)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put("event", "nodeLost");
|
||||
long waitForSeconds = 1 + random().nextInt(5);
|
||||
props.put("waitFor", waitForSeconds);
|
||||
props.put("enabled", "true");
|
||||
List<Map<String, String>> actions = new ArrayList<>(3);
|
||||
Map<String, String> map = new HashMap<>(2);
|
||||
map.put("name", "compute_plan");
|
||||
map.put("class", "solr.ComputePlanAction");
|
||||
actions.add(map);
|
||||
map = new HashMap<>(2);
|
||||
map.put("name", "execute_plan");
|
||||
map.put("class", "solr.ExecutePlanAction");
|
||||
actions.add(map);
|
||||
map = new HashMap<>(2);
|
||||
map.put("name", "log_plan");
|
||||
map.put("class", "solr.LogPlanAction");
|
||||
actions.add(map);
|
||||
props.put("actions", actions);
|
||||
|
||||
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
|
||||
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
||||
trigger.run();
|
||||
String lostNodeName = cluster.getJettySolrRunner(1).getNodeName();
|
||||
cluster.stopJettySolrRunner(1);
|
||||
|
||||
AtomicBoolean fired = new AtomicBoolean(false);
|
||||
AtomicReference<NodeLostTrigger.NodeLostEvent> eventRef = new AtomicReference<>();
|
||||
trigger.setListener(event -> {
|
||||
if (fired.compareAndSet(false, true)) {
|
||||
eventRef.set(event);
|
||||
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
fail("NodeLostListener was fired before the configured waitFor period");
|
||||
}
|
||||
} else {
|
||||
fail("NodeLostListener was fired more than once!");
|
||||
}
|
||||
});
|
||||
int counter = 0;
|
||||
do {
|
||||
trigger.run();
|
||||
Thread.sleep(1000);
|
||||
if (counter++ > 10) {
|
||||
fail("Lost node was not discovered by trigger even after 10 seconds");
|
||||
}
|
||||
} while (!fired.get());
|
||||
|
||||
NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get();
|
||||
assertNotNull(nodeLostEvent);
|
||||
assertEquals("", lostNodeName, nodeLostEvent.getNodeName());
|
||||
|
||||
}
|
||||
|
||||
// remove a node but add it back before the waitFor period expires
|
||||
// and assert that the trigger doesn't fire at all
|
||||
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
|
||||
final int waitTime = 2;
|
||||
props.put("waitFor", waitTime);
|
||||
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
||||
trigger.run();
|
||||
|
||||
JettySolrRunner lostNode = cluster.getJettySolrRunner(1);
|
||||
lostNode.stop();
|
||||
AtomicBoolean fired = new AtomicBoolean(false);
|
||||
trigger.setListener(event -> {
|
||||
if (fired.compareAndSet(false, true)) {
|
||||
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
|
||||
fail("NodeLostListener was fired before the configured waitFor period");
|
||||
}
|
||||
} else {
|
||||
fail("NodeLostListener was fired more than once!");
|
||||
}
|
||||
});
|
||||
trigger.run(); // first run should detect the new node
|
||||
int counter = 0;
|
||||
do {
|
||||
if (container.getZkController().getZkStateReader().getClusterState().getLiveNodes().size() == 3) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
if (counter++ > 20) {
|
||||
fail("Live nodes not updated!");
|
||||
}
|
||||
} while (true);
|
||||
counter = 0;
|
||||
lostNode.start();
|
||||
do {
|
||||
trigger.run();
|
||||
Thread.sleep(1000);
|
||||
if (counter++ > waitTime + 1) { // run it a little more than the wait time
|
||||
break;
|
||||
}
|
||||
} while (true);
|
||||
|
||||
// ensure the event was not fired
|
||||
assertFalse(fired.get());
|
||||
}
|
||||
}
|
||||
}
|
@ -28,8 +28,11 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
@ -38,23 +41,32 @@ import org.slf4j.LoggerFactory;
|
||||
/**
|
||||
* An end-to-end integration test for triggers
|
||||
*/
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
|
||||
public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static CountDownLatch actionCreated = new CountDownLatch(1);
|
||||
private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
|
||||
private static CountDownLatch actionCreated;
|
||||
private static CountDownLatch triggerFiredLatch;
|
||||
private static int waitForSeconds = 1;
|
||||
private static AtomicBoolean triggerFired = new AtomicBoolean(false);
|
||||
private static AtomicReference<NodeAddedTrigger.NodeAddedEvent> eventRef = new AtomicReference<>();
|
||||
private static AtomicBoolean triggerFired;
|
||||
private static AtomicReference<AutoScaling.TriggerEvent> eventRef;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(1)
|
||||
configureCluster(2)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
waitForSeconds = 1 + random().nextInt(3);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupTest() {
|
||||
actionCreated = new CountDownLatch(1);
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
triggerFired = new AtomicBoolean(false);
|
||||
eventRef = new AtomicReference<>();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeAddedTrigger() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
@ -81,12 +93,54 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
assertTrue(triggerFired.get());
|
||||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
|
||||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
|
||||
assertNotNull(nodeAddedEvent);
|
||||
assertEquals("The node added trigger was fired but for a different node",
|
||||
newNode.getNodeName(), nodeAddedEvent.getNodeName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeLostTrigger() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
// todo nocommit -- add testing for the v2 path
|
||||
// String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling";
|
||||
String path = "/admin/autoscaling";
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_lost_trigger'," +
|
||||
"'event' : 'nodeLost'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : 'true'," +
|
||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||
int nonOverseerLeaderIndex = 0;
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||
if (!jetty.getNodeName().equals(overseerLeader)) {
|
||||
nonOverseerLeaderIndex = i;
|
||||
}
|
||||
}
|
||||
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
|
||||
fail("The TriggerAction should have been created by now");
|
||||
}
|
||||
|
||||
String lostNodeName = cluster.getJettySolrRunner(nonOverseerLeaderIndex).getNodeName();
|
||||
cluster.stopJettySolrRunner(nonOverseerLeaderIndex);
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
assertTrue(triggerFired.get());
|
||||
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
|
||||
assertNotNull(nodeLostEvent);
|
||||
assertEquals("The node lost trigger was fired but for a different node",
|
||||
lostNodeName, nodeLostEvent.getNodeName());
|
||||
}
|
||||
|
||||
public static class TestTriggerAction implements TriggerAction {
|
||||
|
||||
public TestTriggerAction() {
|
||||
@ -107,7 +161,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||
@Override
|
||||
public void process(AutoScaling.TriggerEvent event) {
|
||||
if (triggerFired.compareAndSet(false, true)) {
|
||||
eventRef.set((NodeAddedTrigger.NodeAddedEvent) event);
|
||||
eventRef.set(event);
|
||||
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||
fail("NodeAddedListener was fired before the configured waitFor period");
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user