diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 15f1151e98c..4c2dd0bfe95 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -73,6 +73,8 @@ New Features * SOLR-10376: Implement autoscaling trigger for nodeAdded event. (shalin) +* SOLR-10396: Implement trigger support for nodeLost event type (Cao Manh Dat, shalin) + Bug Fixes ---------------------- * SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509. diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java index eb98130b4b1..1e677b252b8 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java @@ -120,6 +120,8 @@ public class AutoScaling { switch (type) { case NODEADDED: return new NodeAddedTrigger(name, props, coreContainer); + case NODELOST: + return new NodeLostTrigger(name, props, coreContainer); default: throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java new file mode 100644 index 00000000000..7f8a0c28c7e --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.autoscaling; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.core.CoreContainer; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Trigger for the {@link AutoScaling.EventType#NODELOST} event + */ +public class NodeLostTrigger implements AutoScaling.Trigger { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final String name; + private final Map properties; + private final CoreContainer container; + private final List actions; + private final AtomicReference> listenerRef; + + private boolean isClosed = false; + + private Set lastLiveNodes; + + private Map nodeNameVsTimeRemoved = new HashMap<>(); + + public NodeLostTrigger(String name, Map properties, + CoreContainer container) { + this.name = name; + this.properties = properties; + this.container = container; + this.listenerRef = new AtomicReference<>(); + List> o = (List>) properties.get("actions"); + if (o != null && !o.isEmpty()) { + actions = new ArrayList<>(3); + for (Map map : o) { + TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class); + action.init(map); + actions.add(action); + } + } else { + actions = Collections.emptyList(); + } + lastLiveNodes = container.getZkController().getZkStateReader().getClusterState().getLiveNodes(); + log.info("Initial livenodes: " + lastLiveNodes); + } + + @Override + public void setListener(AutoScaling.TriggerListener listener) { + listenerRef.set(listener); + } + + @Override + public AutoScaling.TriggerListener getListener() { + return listenerRef.get(); + } + + @Override + public String getName() { + return name; + } + + @Override + public AutoScaling.EventType getEventType() { + return AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT)); + } + + @Override + public boolean isEnabled() { + return Boolean.parseBoolean((String) properties.getOrDefault("enabled", "true")); + } + + @Override + public int getWaitForSecond() { + return ((Long) properties.getOrDefault("waitFor", -1L)).intValue(); + } + + @Override + public Map getProperties() { + return properties; + } + + @Override + public List getActions() { + return actions; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof NodeLostTrigger) { + NodeLostTrigger that = (NodeLostTrigger) obj; + return this.name.equals(that.name) + && this.properties.equals(that.properties); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(name, properties); + } + + @Override + public void close() throws IOException { + synchronized (this) { + isClosed = true; + } + } + + @Override + public void run() { + try { + synchronized (this) { + if (isClosed) { + log.warn("NodeLostTrigger ran but was already closed"); + throw new RuntimeException("Trigger has been closed"); + } + } + log.debug("Running NodeLostTrigger"); + + ZkStateReader reader = container.getZkController().getZkStateReader(); + Set newLiveNodes = reader.getClusterState().getLiveNodes(); + log.info("Found livenodes: " + newLiveNodes); + + // have any nodes that we were tracking been added to the cluster? + // if so, remove them from the tracking map + Set trackingKeySet = nodeNameVsTimeRemoved.keySet(); + trackingKeySet.removeAll(newLiveNodes); + + // have any nodes been removed? + Set copyOfLastLiveNodes = new HashSet<>(lastLiveNodes); + copyOfLastLiveNodes.removeAll(newLiveNodes); + copyOfLastLiveNodes.forEach(n -> { + log.info("Tracking lost node: {}", n); + nodeNameVsTimeRemoved.put(n, System.nanoTime()); + }); + + // has enough time expired to trigger events for a node? + for (Map.Entry entry : nodeNameVsTimeRemoved.entrySet()) { + String nodeName = entry.getKey(); + Long timeRemoved = entry.getValue(); + if (TimeUnit.SECONDS.convert(System.nanoTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) { + // fire! + AutoScaling.TriggerListener listener = listenerRef.get(); + if (listener != null) { + log.info("NodeLostTrigger firing registered listener"); + listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName)); + } + trackingKeySet.remove(nodeName); + } + } + + lastLiveNodes = newLiveNodes; + } catch (RuntimeException e) { + log.error("Unexpected exception in NodeLostTrigger", e); + } + } + + @Override + public boolean isClosed() { + synchronized (this) { + return isClosed; + } + } + + public static class NodeLostEvent implements AutoScaling.TriggerEvent { + private final NodeLostTrigger source; + private final long nodeLostNanoTime; + private final String nodeName; + + private Map context; + + public NodeLostEvent(NodeLostTrigger source, long nodeLostNanoTime, String nodeRemoved) { + this.source = source; + this.nodeLostNanoTime = nodeLostNanoTime; + this.nodeName = nodeRemoved; + } + + @Override + public NodeLostTrigger getSource() { + return source; + } + + @Override + public long getEventNanoTime() { + return nodeLostNanoTime; + } + + public String getNodeName() { + return nodeName; + } + + public AutoScaling.EventType getType() { + return source.getEventType(); + } + + @Override + public void setContext(Map context) { + this.context = context; + } + + @Override + public Map getContext() { + return context; + } + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java new file mode 100644 index 00000000000..b64ba68ad0b --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.autoscaling; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.core.CoreContainer; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test for {@link NodeLostTrigger} + */ +public class NodeLostTriggerTest extends SolrCloudTestCase { + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(5) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + } + + @Test + public void test() throws Exception { + CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer(); + Map props = new HashMap<>(); + props.put("event", "nodeLost"); + long waitForSeconds = 1 + random().nextInt(5); + props.put("waitFor", waitForSeconds); + props.put("enabled", "true"); + List> actions = new ArrayList<>(3); + Map map = new HashMap<>(2); + map.put("name", "compute_plan"); + map.put("class", "solr.ComputePlanAction"); + actions.add(map); + map = new HashMap<>(2); + map.put("name", "execute_plan"); + map.put("class", "solr.ExecutePlanAction"); + actions.add(map); + map = new HashMap<>(2); + map.put("name", "log_plan"); + map.put("class", "solr.LogPlanAction"); + actions.add(map); + props.put("actions", actions); + + try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) { + trigger.setListener(event -> fail("Did not expect the listener to fire on first run!")); + trigger.run(); + String lostNodeName = cluster.getJettySolrRunner(1).getNodeName(); + cluster.stopJettySolrRunner(1); + + AtomicBoolean fired = new AtomicBoolean(false); + AtomicReference eventRef = new AtomicReference<>(); + trigger.setListener(event -> { + if (fired.compareAndSet(false, true)) { + eventRef.set(event); + if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { + fail("NodeLostListener was fired before the configured waitFor period"); + } + } else { + fail("NodeLostListener was fired more than once!"); + } + }); + int counter = 0; + do { + trigger.run(); + Thread.sleep(1000); + if (counter++ > 10) { + fail("Lost node was not discovered by trigger even after 10 seconds"); + } + } while (!fired.get()); + + NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get(); + assertNotNull(nodeLostEvent); + assertEquals("", lostNodeName, nodeLostEvent.getNodeName()); + + } + + // remove a node but add it back before the waitFor period expires + // and assert that the trigger doesn't fire at all + try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) { + final int waitTime = 2; + props.put("waitFor", waitTime); + trigger.setListener(event -> fail("Did not expect the listener to fire on first run!")); + trigger.run(); + + JettySolrRunner lostNode = cluster.getJettySolrRunner(1); + lostNode.stop(); + AtomicBoolean fired = new AtomicBoolean(false); + trigger.setListener(event -> { + if (fired.compareAndSet(false, true)) { + if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) { + fail("NodeLostListener was fired before the configured waitFor period"); + } + } else { + fail("NodeLostListener was fired more than once!"); + } + }); + trigger.run(); // first run should detect the new node + int counter = 0; + do { + if (container.getZkController().getZkStateReader().getClusterState().getLiveNodes().size() == 3) { + break; + } + Thread.sleep(100); + if (counter++ > 20) { + fail("Live nodes not updated!"); + } + } while (true); + counter = 0; + lostNode.start(); + do { + trigger.run(); + Thread.sleep(1000); + if (counter++ > waitTime + 1) { // run it a little more than the wait time + break; + } + } while (true); + + // ensure the event was not fired + assertFalse(fired.get()); + } + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java index cb92680ce61..3e35392e0de 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java @@ -28,8 +28,11 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.common.util.NamedList; +import org.apache.solr.util.LogLevel; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -38,23 +41,32 @@ import org.slf4j.LoggerFactory; /** * An end-to-end integration test for triggers */ +@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG") public class TriggerIntegrationTest extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static CountDownLatch actionCreated = new CountDownLatch(1); - private static CountDownLatch triggerFiredLatch = new CountDownLatch(1); + private static CountDownLatch actionCreated; + private static CountDownLatch triggerFiredLatch; private static int waitForSeconds = 1; - private static AtomicBoolean triggerFired = new AtomicBoolean(false); - private static AtomicReference eventRef = new AtomicReference<>(); + private static AtomicBoolean triggerFired; + private static AtomicReference eventRef; @BeforeClass public static void setupCluster() throws Exception { - configureCluster(1) + configureCluster(2) .addConfig("conf", configset("cloud-minimal")) .configure(); waitForSeconds = 1 + random().nextInt(3); } + @Before + public void setupTest() { + actionCreated = new CountDownLatch(1); + triggerFiredLatch = new CountDownLatch(1); + triggerFired = new AtomicBoolean(false); + eventRef = new AtomicReference<>(); + } + @Test public void testNodeAddedTrigger() throws Exception { CloudSolrClient solrClient = cluster.getSolrClient(); @@ -81,12 +93,54 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS); assertTrue("The trigger did not fire at all", await); assertTrue(triggerFired.get()); - NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get(); + NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get(); assertNotNull(nodeAddedEvent); assertEquals("The node added trigger was fired but for a different node", newNode.getNodeName(), nodeAddedEvent.getNodeName()); } + @Test + public void testNodeLostTrigger() throws Exception { + CloudSolrClient solrClient = cluster.getSolrClient(); + // todo nocommit -- add testing for the v2 path + // String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling"; + String path = "/admin/autoscaling"; + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : 'node_lost_trigger'," + + "'event' : 'nodeLost'," + + "'waitFor' : '" + waitForSeconds + "s'," + + "'enabled' : 'true'," + + "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" + + "}}"; + NamedList overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus()); + String overseerLeader = (String) overSeerStatus.get("leader"); + int nonOverseerLeaderIndex = 0; + for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) { + JettySolrRunner jetty = cluster.getJettySolrRunner(i); + if (!jetty.getNodeName().equals(overseerLeader)) { + nonOverseerLeaderIndex = i; + } + } + SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand); + NamedList response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + if (!actionCreated.await(3, TimeUnit.SECONDS)) { + fail("The TriggerAction should have been created by now"); + } + + String lostNodeName = cluster.getJettySolrRunner(nonOverseerLeaderIndex).getNodeName(); + cluster.stopJettySolrRunner(nonOverseerLeaderIndex); + boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS); + assertTrue("The trigger did not fire at all", await); + assertTrue(triggerFired.get()); + NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get(); + assertNotNull(nodeLostEvent); + assertEquals("The node lost trigger was fired but for a different node", + lostNodeName, nodeLostEvent.getNodeName()); + } + public static class TestTriggerAction implements TriggerAction { public TestTriggerAction() { @@ -107,7 +161,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { @Override public void process(AutoScaling.TriggerEvent event) { if (triggerFired.compareAndSet(false, true)) { - eventRef.set((NodeAddedTrigger.NodeAddedEvent) event); + eventRef.set(event); if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) { fail("NodeAddedListener was fired before the configured waitFor period"); }