SOLR-11912: Improve TriggerIntegrationTest to avoid side-effects between tests (spurious autoscaling events).

Add a convenience method to DistribStateManager to remove data recursively.
This commit is contained in:
Andrzej Bialecki 2018-02-15 20:44:00 +01:00
parent 250e5b2aba
commit 26a037636d
13 changed files with 221 additions and 54 deletions

View File

@ -468,6 +468,10 @@ public class Overseer implements SolrCloseable {
this.isClosed = true;
}
public Closeable getThread() {
return thread;
}
public boolean isClosed() {
return this.isClosed;
}
@ -566,6 +570,15 @@ public class Overseer implements SolrCloseable {
public synchronized OverseerThread getUpdaterThread() {
return updaterThread;
}
/**
* For tests.
* @lucene.internal
* @return trigger thread
*/
public synchronized OverseerThread getTriggerThread() {
return triggerThread;
}
public synchronized void close() {
if (closed) return;

View File

@ -36,6 +36,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
@ -396,17 +397,11 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
// clean up old terms node
String termsPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms";
try {
if (stateManager.hasData(termsPath)) {
List<String> paths = stateManager.listData(termsPath);
for (String path : paths) {
stateManager.removeData(termsPath + "/" + path, -1);
}
stateManager.removeData(termsPath, -1);
}
stateManager.removeRecursively(termsPath, true, true);
} catch (InterruptedException e) {
Thread.interrupted();
throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
} catch (KeeperException | IOException | BadVersionException e) {
} catch (KeeperException | IOException | NotEmptyException | BadVersionException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
}
try {

View File

@ -102,6 +102,15 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
log.debug("OverseerTriggerThread has been closed explicitly");
}
/**
* For tests.
* @lucene.internal
* @return current {@link ScheduledTriggers} instance
*/
public ScheduledTriggers getScheduledTriggers() {
return scheduledTriggers;
}
@Override
public boolean isClosed() {
return isClosed;

View File

@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@ -59,7 +58,6 @@ import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.zookeeper.Op;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -349,8 +347,9 @@ public class ScheduledTriggers implements Closeable {
/**
* Pauses all scheduled trigger invocations without interrupting any that are in progress
* @lucene.internal
*/
private synchronized void pauseTriggers() {
public synchronized void pauseTriggers() {
if (log.isDebugEnabled()) {
log.debug("Pausing all triggers: {}", scheduledTriggers.keySet());
}
@ -360,8 +359,9 @@ public class ScheduledTriggers implements Closeable {
/**
* Resumes all previously cancelled triggers to be scheduled after the given initial delay
* @param afterDelayMillis the initial delay in milliseconds after which triggers should be resumed
* @lucene.internal
*/
private synchronized void resumeTriggers(long afterDelayMillis) {
public synchronized void resumeTriggers(long afterDelayMillis) {
scheduledTriggers.forEach((s, scheduledTrigger) -> {
if (scheduledTrigger.scheduledFuture.isCancelled()) {
log.debug("Resuming trigger: {} after {}ms", s, afterDelayMillis);
@ -430,6 +430,17 @@ public class ScheduledTriggers implements Closeable {
}
}
/**
* Remove and stop all triggers. Also cleans up any leftover
* state / events in ZK.
*/
public synchronized void removeAll() {
getScheduledTriggerNames().forEach(t -> {
log.info("-- removing trigger: " + t);
remove(t);
});
}
/**
* Removes and stops the trigger with the given name. Also cleans up any leftover
* state / events in ZK.
@ -446,26 +457,12 @@ public class ScheduledTriggers implements Closeable {
String statePath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName;
String eventsPath = ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName;
try {
if (stateManager.hasData(statePath)) {
stateManager.removeData(statePath, -1);
}
} catch (NoSuchElementException e) {
// already removed by someone else
stateManager.removeRecursively(statePath, true, true);
} catch (Exception e) {
log.warn("Failed to remove state for removed trigger " + statePath, e);
}
try {
if (stateManager.hasData(eventsPath)) {
List<String> events = stateManager.listData(eventsPath);
List<Op> ops = new ArrayList<>(events.size() + 1);
events.forEach(ev -> {
ops.add(Op.delete(eventsPath + "/" + ev, -1));
});
ops.add(Op.delete(eventsPath, -1));
stateManager.multi(ops);
}
} catch (NoSuchElementException e) {
// already removed by someone else
stateManager.removeRecursively(eventsPath, true, true);
} catch (Exception e) {
log.warn("Failed to remove events for removed trigger " + eventsPath, e);
}

View File

@ -44,6 +44,7 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
@ -124,10 +125,30 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Before
public void setupTest() throws Exception {
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
String overseerLeader = (String) overSeerStatus.get("leader");
int overseerLeaderIndex = 0;
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
if (jetty.getNodeName().equals(overseerLeader)) {
overseerLeaderIndex = i;
break;
}
}
Overseer overseer = cluster.getJettySolrRunner(overseerLeaderIndex).getCoreContainer().getZkController().getOverseer();
ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread)overseer.getTriggerThread().getThread()).getScheduledTriggers();
// aggressively remove all active scheduled triggers
scheduledTriggers.removeAll();
// clear any persisted auto scaling configuration
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
// restart Overseer. Even though we reset the autoscaling config some already running
// trigger threads may still continue to execute and produce spurious events
cluster.stopJettySolrRunner(overseerLeaderIndex);
Thread.sleep(5000);
throttlingDelayMs.set(TimeUnit.SECONDS.toMillis(ScheduledTriggers.DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS));
waitForSeconds = 1 + random().nextInt(3);
actionConstructorCalled = new CountDownLatch(1);
@ -145,26 +166,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
// lets start a node
cluster.startJettySolrRunner();
}
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
// clear any events or markers
// todo: consider the impact of such cleanup on regular cluster restarts
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
}
private void deleteChildrenRecursively(String path) throws Exception {
List<String> paths = zkClient().getChildren(path, null, true);
paths.forEach(n -> {
try {
ZKUtil.deleteRecursive(zkClient().getSolrZooKeeper(), path + "/" + n);
} catch (KeeperException.NoNodeException e) {
// ignore
} catch (KeeperException | InterruptedException e) {
log.warn("Error deleting old data", e);
}
});
cloudManager.getDistribStateManager().removeRecursively(path, true, false);
}
@Test

View File

@ -39,6 +39,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.common.cloud.ZkStateReader;
@ -469,7 +470,7 @@ public class SimDistribStateManager implements DistribStateManager {
}
@Override
public void removeData(String path, int version) throws NoSuchElementException, BadVersionException, IOException {
public void removeData(String path, int version) throws NoSuchElementException, NotEmptyException, BadVersionException, IOException {
multiLock.lock();
try {
Node n = traverse(path, false, CreateMode.PERSISTENT);
@ -480,6 +481,9 @@ public class SimDistribStateManager implements DistribStateManager {
if (parent == null) {
throw new IOException("Cannot remove root node");
}
if (!n.children.isEmpty()) {
throw new NotEmptyException(path);
}
parent.removeChild(n.name, version);
} finally {
multiLock.unlock();

View File

@ -187,16 +187,7 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
if (!cluster.getDistribStateManager().hasData(path)) {
return;
}
List<String> children = cluster.getDistribStateManager().listData(path);
for (String c : children) {
if (cluster.getDistribStateManager().hasData(path + "/" + c)) {
try {
cluster.getDistribStateManager().removeData(path + "/" + c, -1);
} catch (NoSuchElementException e) {
// ignore
}
}
}
cluster.getDistribStateManager().removeRecursively(path, true, false);
}
/* Cluster helper methods ************************************/

View File

@ -19,6 +19,7 @@ package org.apache.solr.cloud.autoscaling.sim;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.NoSuchElementException;
@ -29,6 +30,7 @@ import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.client.solrj.impl.ZkDistribStateManager;
import org.apache.solr.cloud.ZkTestServer;
@ -120,6 +122,7 @@ public class TestDistribStateManager extends SolrTestCaseJ4 {
assertFalse(stateManager.hasData("/hasData/bar"));
try {
stateManager.createData("/hasData/foo", new byte[0], CreateMode.PERSISTENT);
fail("should have failed (parent /hasData doesn't exist)");
} catch (NoSuchElementException e) {
// expected
}
@ -130,6 +133,43 @@ public class TestDistribStateManager extends SolrTestCaseJ4 {
assertTrue(stateManager.hasData("/hasData/bar"));
}
@Test
public void testRemoveData() throws Exception {
assertFalse(stateManager.hasData("/removeData/foo"));
assertFalse(stateManager.hasData("/removeData/foo/bar"));
assertFalse(stateManager.hasData("/removeData/baz"));
assertFalse(stateManager.hasData("/removeData/baz/1/2/3"));
stateManager.makePath("/removeData/foo/bar");
stateManager.makePath("/removeData/baz/1/2/3");
assertTrue(stateManager.hasData("/removeData/foo"));
assertTrue(stateManager.hasData("/removeData/foo/bar"));
assertTrue(stateManager.hasData("/removeData/baz/1/2/3"));
try {
stateManager.removeData("/removeData/foo", -1);
fail("should have failed (node has children)");
} catch (NotEmptyException e) {
// expected
}
stateManager.removeData("/removeData/foo/bar", -1);
stateManager.removeData("/removeData/foo", -1);
// test recursive listing and removal
stateManager.removeRecursively("/removeData/baz/1", false, false);
assertFalse(stateManager.hasData("/removeData/baz/1/2"));
assertTrue(stateManager.hasData("/removeData/baz/1"));
// should silently ignore
stateManager.removeRecursively("/removeData/baz/1/2", true, true);
stateManager.removeRecursively("/removeData/baz/1", false, true);
assertFalse(stateManager.hasData("/removeData/baz/1"));
try {
stateManager.removeRecursively("/removeData/baz/1", false, true);
fail("should throw exception - missing path");
} catch (NoSuchElementException e) {
// expected
}
stateManager.removeRecursively("/removeData", true, true);
assertFalse(stateManager.hasData("/removeData"));
}
@Test
public void testListData() throws Exception {
assertFalse(stateManager.hasData("/listData/foo"));

View File

@ -1079,9 +1079,10 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
// wait for listener to capture the SUCCEEDED stage
cluster.getTimeSource().sleep(1000);
cluster.getTimeSource().sleep(5000);
List<CapturedEvent> capturedEvents = listenerEvents.get("bar");
assertNotNull("no events for 'bar'!", capturedEvents);
// we may get a few IGNORED events if other tests caused events within cooldown period
assertTrue(capturedEvents.toString(), capturedEvents.size() > 0);
long prevTimestamp = capturedEvents.get(capturedEvents.size() - 1).timestamp;

View File

@ -75,7 +75,7 @@ public class DelegatingDistribStateManager implements DistribStateManager {
}
@Override
public void removeData(String path, int version) throws NoSuchElementException, IOException, BadVersionException, KeeperException, InterruptedException {
public void removeData(String path, int version) throws NoSuchElementException, NotEmptyException, IOException, BadVersionException, KeeperException, InterruptedException {
delegate.removeData(path, version);
}

View File

@ -17,6 +17,10 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
@ -62,7 +66,7 @@ public interface DistribStateManager extends SolrCloseable {
*/
String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, IOException, KeeperException, InterruptedException;
void removeData(String path, int version) throws NoSuchElementException, IOException, KeeperException, InterruptedException, BadVersionException;
void removeData(String path, int version) throws NoSuchElementException, IOException, NotEmptyException, KeeperException, InterruptedException, BadVersionException;
void setData(String path, byte[] data, int version) throws BadVersionException, NoSuchElementException, IOException, KeeperException, InterruptedException;
@ -73,4 +77,67 @@ public interface DistribStateManager extends SolrCloseable {
default AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
return getAutoScalingConfig(null);
}
/**
* List a subtree including the root path, using breadth-first traversal.
* @param root root path
* @return list of full paths, with the root path being the first element
*/
default List<String> listTree(String root) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
Deque<String> queue = new LinkedList<String>();
List<String> tree = new ArrayList<String>();
if (!root.startsWith("/")) {
root = "/" + root;
}
queue.add(root);
tree.add(root);
while (true) {
String node = queue.pollFirst();
if (node == null) {
break;
}
List<String> children = listData(node);
for (final String child : children) {
final String childPath = node + "/" + child;
queue.add(childPath);
tree.add(childPath);
}
}
return tree;
}
/**
* Remove data recursively.
* @param root root path
* @param ignoreMissing ignore errors if root or any children path is missing
* @param includeRoot when true delete also the root path
*/
default void removeRecursively(String root, boolean ignoreMissing, boolean includeRoot) throws NoSuchElementException, IOException, NotEmptyException, KeeperException, InterruptedException, BadVersionException {
List<String> tree;
try {
tree = listTree(root);
} catch (NoSuchElementException e) {
if (ignoreMissing) {
return;
} else {
throw e;
}
}
Collections.reverse(tree);
for (String p : tree) {
if (p.equals("/")) {
continue;
}
if (p.equals(root) && !includeRoot) {
continue;
}
try {
removeData(p, -1);
} catch (NoSuchElementException e) {
if (!ignoreMissing) {
throw e;
}
}
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
/**
*
*/
public class NotEmptyException extends Exception {
private final String path;
public NotEmptyException(String path) {
super("Path not empty: " + path);
this.path = path;
}
public String getPath() {
return path;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
@ -125,11 +126,13 @@ public class ZkDistribStateManager implements DistribStateManager {
}
@Override
public void removeData(String path, int version) throws NoSuchElementException, BadVersionException, IOException, KeeperException, InterruptedException {
public void removeData(String path, int version) throws NoSuchElementException, BadVersionException, NotEmptyException, IOException, KeeperException, InterruptedException {
try {
zkClient.delete(path, version, true);
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException(path);
} catch (KeeperException.NotEmptyException e) {
throw new NotEmptyException(path);
} catch (KeeperException.BadVersionException e) {
throw new BadVersionException(version, path);
} catch (InterruptedException e) {