mirror of https://github.com/apache/lucene.git
SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread safe.
This commit is contained in:
parent
6c9391df01
commit
744b419b42
|
@ -108,6 +108,9 @@ Bug Fixes
|
|||
* SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a
|
||||
SolrResponse node is in the overseer queue. (Jessica Cheng Mallet via shalin)
|
||||
|
||||
* SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread safe. (Scott Blum, hoss,
|
||||
sarowe, Erick Erickson, Mark Miller, shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.
|
||||
|
|
|
@ -0,0 +1,252 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.core.CloudConfig.CloudConfigBuilder;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Stress test LiveNodes watching.
|
||||
*
|
||||
* Does bursts of adds to live_nodes using parallel threads to and verifies that after each
|
||||
* burst a ZkStateReader detects the correct set.
|
||||
*/
|
||||
@Slow
|
||||
public class TestStressLiveNodes extends SolrCloudTestCase {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
/** A basic cloud client, we'll be testing the behavior of it's ZkStateReader */
|
||||
private static CloudSolrClient CLOUD_CLIENT;
|
||||
|
||||
/** The addr of the zk server used in this test */
|
||||
private static String ZK_SERVER_ADDR;
|
||||
|
||||
/* how many seconds we're willing to wait for our executor tasks to finish before failing the test */
|
||||
private final static int WAIT_TIME = TEST_NIGHTLY ? 60 : 30;
|
||||
|
||||
@BeforeClass
|
||||
private static void createMiniSolrCloudCluster() throws Exception {
|
||||
|
||||
// we only need 1 node, and we don't care about any configs or collections
|
||||
// we're going to fake all the live_nodes changes we want to fake.
|
||||
configureCluster(1).configure();
|
||||
|
||||
// give all nodes a chance to come alive
|
||||
TestTolerantUpdateProcessorCloud.assertSpinLoopAllJettyAreRunning(cluster);
|
||||
|
||||
CLOUD_CLIENT = cluster.getSolrClient();
|
||||
CLOUD_CLIENT.connect(); // force connection even though we aren't sending any requests
|
||||
|
||||
ZK_SERVER_ADDR = cluster.getZkServer().getZkAddress();
|
||||
|
||||
}
|
||||
|
||||
private static SolrZkClient newSolrZkClient() {
|
||||
assertNotNull(ZK_SERVER_ADDR);
|
||||
// WTF is CloudConfigBuilder.DEFAULT_ZK_CLIENT_TIMEOUT private?
|
||||
return new SolrZkClient(ZK_SERVER_ADDR, 15000);
|
||||
}
|
||||
|
||||
/** returns the true set of live nodes (currently in zk) as a sorted list */
|
||||
private static List<String> getTrueLiveNodesFromZk() throws Exception {
|
||||
SolrZkClient client = newSolrZkClient();
|
||||
try {
|
||||
ArrayList<String> result = new ArrayList<>(client.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null, true));
|
||||
Collections.sort(result);
|
||||
return result;
|
||||
} finally {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* returns the cached set of live nodes (according to the ZkStateReader in our CloudSolrClient)
|
||||
* as a sorted list.
|
||||
* This is done in a sleep+retry loop until the result matches the expectedCount, or a few iters have passed
|
||||
* (this way we aren't testing how fast the watchers complete, just that they got the correct result)
|
||||
*/
|
||||
private static List<String> getCachedLiveNodesFromLocalState(final int expectedCount) throws Exception {
|
||||
ArrayList<String> result = null;
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
result = new ArrayList<>(CLOUD_CLIENT.getZkStateReader().getClusterState().getLiveNodes());
|
||||
if (expectedCount != result.size()) {
|
||||
log.info("sleeping #{} to give watchers a chance to finish: {} != {}",
|
||||
i, expectedCount, result.size());
|
||||
Thread.sleep(200);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (expectedCount != result.size()) {
|
||||
log.error("gave up waiting for live nodes to match expected size: {} != {}",
|
||||
expectedCount, result.size());
|
||||
}
|
||||
Collections.sort(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
public void testStress() throws Exception {
|
||||
|
||||
// do many iters, so we have "bursts" of adding nodes that we then check
|
||||
final int numIters = atLeast(1000);
|
||||
for (int iter = 0; iter < numIters; iter++) {
|
||||
|
||||
// sanity check that ZK says there is in fact 1 live node
|
||||
List<String> actualLiveNodes = getTrueLiveNodesFromZk();
|
||||
assertEquals("iter"+iter+": " + actualLiveNodes.toString(),
|
||||
1, actualLiveNodes.size());
|
||||
|
||||
// only here do we forcibly update the cached live nodes so we don't have to wait for it to catch up
|
||||
// with all the ephemeral nodes that vanished after the last iteration
|
||||
CLOUD_CLIENT.getZkStateReader().updateLiveNodes();
|
||||
|
||||
// sanity check that our Cloud Client's local state knows about the 1 (real) live node in our cluster
|
||||
List<String> cachedLiveNodes = getCachedLiveNodesFromLocalState(actualLiveNodes.size());
|
||||
assertEquals("iter"+iter+" " + actualLiveNodes.size() + " != " + cachedLiveNodes.size(),
|
||||
actualLiveNodes, cachedLiveNodes);
|
||||
|
||||
|
||||
// start spining up some threads to add some live_node children in parallel
|
||||
|
||||
// we don't need a lot of threads or nodes (we don't want to swamp the CPUs
|
||||
// just bursts of conccurent adds) but we do want to randomize it a bit so we increase the
|
||||
// odds of concurrent watchers firing regardless of the num CPUs or load on the machine running
|
||||
// the test (but we deliberately don't look at availableProcessors() since we want randomization
|
||||
// consistency across all machines for a given seed)
|
||||
final int numThreads = TestUtil.nextInt(random(), 2, 5);
|
||||
|
||||
// use same num for all thrashers, to increase likely hood of them all competing
|
||||
// (diff random number would mean heavy concurency only for ~ the first N=lowest num requetss)
|
||||
//
|
||||
// this does not need to be a large number -- in fact, the higher it is, the more
|
||||
// likely we are to see a mistake in early watcher triggers get "corrected" by a later one
|
||||
// and overlook a possible bug
|
||||
final int numNodesPerThrasher = TestUtil.nextInt(random(), 1, 5);
|
||||
|
||||
log.info("preparing parallel adds to live nodes: iter={}, numThreads={} numNodesPerThread={}",
|
||||
iter, numThreads, numNodesPerThrasher);
|
||||
|
||||
// NOTE: using ephemeral nodes
|
||||
// so we can't close any of these thrashers until we are done with our assertions
|
||||
final List<LiveNodeTrasher> thrashers = new ArrayList<>(numThreads);
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
thrashers.add(new LiveNodeTrasher("T"+iter+"_"+i, numNodesPerThrasher));
|
||||
}
|
||||
try {
|
||||
final ExecutorService executorService = ExecutorUtil.newMDCAwareFixedThreadPool
|
||||
(thrashers.size()+1, new DefaultSolrThreadFactory("test_live_nodes_thrasher_iter"+iter));
|
||||
|
||||
executorService.invokeAll(thrashers);
|
||||
executorService.shutdown();
|
||||
if (! executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) {
|
||||
for (LiveNodeTrasher thrasher : thrashers) {
|
||||
thrasher.stop();
|
||||
}
|
||||
}
|
||||
assertTrue("iter"+iter+": thrashers didn't finish even after explicitly stopping",
|
||||
executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS));
|
||||
|
||||
// sanity check the *real* live_nodes entries from ZK match what the thrashers added
|
||||
int totalAdded = 1; // 1 real live node when we started
|
||||
for (LiveNodeTrasher thrasher : thrashers) {
|
||||
totalAdded += thrasher.getNumAdded();
|
||||
}
|
||||
actualLiveNodes = getTrueLiveNodesFromZk();
|
||||
assertEquals("iter"+iter, totalAdded, actualLiveNodes.size());
|
||||
|
||||
// verify our local client knows the correct set of live nodes
|
||||
cachedLiveNodes = getCachedLiveNodesFromLocalState(actualLiveNodes.size());
|
||||
assertEquals("iter"+iter+" " + actualLiveNodes.size() + " != " + cachedLiveNodes.size(),
|
||||
actualLiveNodes, cachedLiveNodes);
|
||||
|
||||
} finally {
|
||||
for (LiveNodeTrasher thrasher : thrashers) {
|
||||
// shutdown our zk connection, freeing our ephemeral nodes
|
||||
thrasher.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** NOTE: has internal counter which is not thread safe, only call() in one thread at a time */
|
||||
public static final class LiveNodeTrasher implements Callable<Integer> {
|
||||
private final String id;
|
||||
private final int numNodesToAdd;
|
||||
private final SolrZkClient client;
|
||||
|
||||
private boolean running = false;;
|
||||
private int numAdded = 0;
|
||||
|
||||
/** ID should ideally be unique amonst any other instances */
|
||||
public LiveNodeTrasher(String id, int numNodesToAdd) {
|
||||
this.id = id;
|
||||
this.numNodesToAdd = numNodesToAdd;
|
||||
this.client = newSolrZkClient();
|
||||
}
|
||||
/** returns the number of nodes actually added w/o error */
|
||||
public Integer call() {
|
||||
running = true;
|
||||
// NOTE: test includes 'running'
|
||||
for (int i = 0; running && i < numNodesToAdd; i++) {
|
||||
final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/thrasher-" + id + "-" + i;
|
||||
try {
|
||||
client.makePath(nodePath, CreateMode.EPHEMERAL, true);
|
||||
numAdded++;
|
||||
} catch (Exception e) {
|
||||
log.error("failed to create: " + nodePath, e);
|
||||
}
|
||||
}
|
||||
return numAdded;
|
||||
}
|
||||
public int getNumAdded() {
|
||||
return numAdded;
|
||||
}
|
||||
public void close() {
|
||||
client.close();
|
||||
}
|
||||
public void stop() {
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -32,6 +32,7 @@ import java.util.Set;
|
|||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.solr.common.Callable;
|
||||
import org.apache.solr.common.SolrException;
|
||||
|
@ -487,6 +488,10 @@ public class ZkStateReader implements Closeable {
|
|||
final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
|
||||
final ClusterState loadedData = ClusterState.load(stat.getVersion(), data, emptySet(), CLUSTER_STATE);
|
||||
synchronized (getUpdateLock()) {
|
||||
if (this.legacyClusterStateVersion >= stat.getVersion()) {
|
||||
// Nothing to do, someone else updated same or newer.
|
||||
return;
|
||||
}
|
||||
this.legacyCollectionStates = loadedData.getCollectionStates();
|
||||
this.legacyClusterStateVersion = stat.getVersion();
|
||||
}
|
||||
|
@ -509,6 +514,9 @@ public class ZkStateReader implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
// We don't get a Stat or track versions on getChildren() calls, so force linearization.
|
||||
private final Object refreshCollectionListLock = new Object();
|
||||
|
||||
/**
|
||||
* Search for any lazy-loadable state format2 collections.
|
||||
*
|
||||
|
@ -522,29 +530,32 @@ public class ZkStateReader implements Closeable {
|
|||
* {@link ClusterState#getCollections()} method as a safeguard against exposing wrong collection names to the users
|
||||
*/
|
||||
private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
|
||||
List<String> children = null;
|
||||
try {
|
||||
children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
LOG.warn("Error fetching collection names: [{}]", e.getMessage());
|
||||
// fall through
|
||||
}
|
||||
if (children == null || children.isEmpty()) {
|
||||
lazyCollectionStates.clear();
|
||||
return;
|
||||
}
|
||||
synchronized (refreshCollectionListLock) {
|
||||
List<String> children = null;
|
||||
try {
|
||||
children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
LOG.warn("Error fetching collection names: [{}]", e.getMessage());
|
||||
// fall through
|
||||
}
|
||||
if (children == null || children.isEmpty()) {
|
||||
lazyCollectionStates.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
// Don't mess with watchedCollections, they should self-manage.
|
||||
// Don't lock getUpdateLock() here, we don't need it and it would cause deadlock.
|
||||
// Don't mess with watchedCollections, they should self-manage.
|
||||
|
||||
// First, drop any children that disappeared.
|
||||
this.lazyCollectionStates.keySet().retainAll(children);
|
||||
for (String coll : children) {
|
||||
// We will create an eager collection for any interesting collections, so don't add to lazy.
|
||||
if (!interestingCollections.contains(coll)) {
|
||||
// Double check contains just to avoid allocating an object.
|
||||
LazyCollectionRef existing = lazyCollectionStates.get(coll);
|
||||
if (existing == null) {
|
||||
lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
|
||||
// First, drop any children that disappeared.
|
||||
this.lazyCollectionStates.keySet().retainAll(children);
|
||||
for (String coll : children) {
|
||||
// We will create an eager collection for any interesting collections, so don't add to lazy.
|
||||
if (!interestingCollections.contains(coll)) {
|
||||
// Double check contains just to avoid allocating an object.
|
||||
LazyCollectionRef existing = lazyCollectionStates.get(coll);
|
||||
if (existing == null) {
|
||||
lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -576,19 +587,35 @@ public class ZkStateReader implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
// We don't get a Stat or track versions on getChildren() calls, so force linearization.
|
||||
private final Object refreshLiveNodesLock = new Object();
|
||||
// Ensures that only the latest getChildren fetch gets applied.
|
||||
private final AtomicReference<Set<String>> lastFetchedLiveNodes = new AtomicReference<>();
|
||||
|
||||
/**
|
||||
* Refresh live_nodes.
|
||||
*/
|
||||
private void refreshLiveNodes(Watcher watcher) throws KeeperException, InterruptedException {
|
||||
Set<String> newLiveNodes;
|
||||
try {
|
||||
List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
|
||||
newLiveNodes = new HashSet<>(nodeList);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
newLiveNodes = emptySet();
|
||||
synchronized (refreshLiveNodesLock) {
|
||||
Set<String> newLiveNodes;
|
||||
try {
|
||||
List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
|
||||
newLiveNodes = new HashSet<>(nodeList);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
newLiveNodes = emptySet();
|
||||
}
|
||||
lastFetchedLiveNodes.set(newLiveNodes);
|
||||
}
|
||||
Set<String> oldLiveNodes;
|
||||
|
||||
// Can't lock getUpdateLock() until we release the other, it would cause deadlock.
|
||||
Set<String> oldLiveNodes, newLiveNodes;
|
||||
synchronized (getUpdateLock()) {
|
||||
newLiveNodes = lastFetchedLiveNodes.getAndSet(null);
|
||||
if (newLiveNodes == null) {
|
||||
// Someone else won the race to apply the last update, just exit.
|
||||
return;
|
||||
}
|
||||
|
||||
oldLiveNodes = this.liveNodes;
|
||||
this.liveNodes = newLiveNodes;
|
||||
if (clusterState != null) {
|
||||
|
|
Loading…
Reference in New Issue