diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index b0d67864ad5..87b954861f4 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -74,6 +74,9 @@ Bug Fixes * SOLR-9246: If the JDBCStream sees an unknown column type it will now throw a detailed exception. (Dennis Gove) +* SOLR-9181: Fix some races in CollectionStateWatcher API (Alan Woodward, Scott + Blum) + Optimizations ---------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java index ad51614b19c..cb0bac54d01 100644 --- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java @@ -18,6 +18,7 @@ package org.apache.solr.cloud.overseer; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.lucene.util.IOUtils; import org.apache.solr.SolrTestCaseJ4; @@ -29,13 +30,14 @@ import org.apache.solr.cloud.ZkTestServer; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; -import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.Utils; public class ZkStateReaderTest extends SolrTestCaseJ4 { + private static final long TIMEOUT = 30; + /** Uses explicit refresh to ensure latest changes are visible. */ public void testStateFormatUpdateWithExplicitRefresh() throws Exception { testStateFormatUpdate(true, true); @@ -84,7 +86,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { { // create new collection with stateFormat = 1 - DocCollection stateV1 = new DocCollection("c1", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE); + DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE); ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1); writer.enqueueUpdate(reader.getClusterState(), c1, null); writer.writePendingUpdates(); @@ -97,12 +99,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { if (explicitRefresh) { reader.forceUpdateCollection("c1"); } else { - for (int i = 0; i < 1000; ++i) { - if (reader.getClusterState().hasCollection("c1")) { - break; - } - Thread.sleep(50); - } + reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null); } DocCollection collection = reader.getClusterState().getCollection("c1"); @@ -112,7 +109,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { { // Now update the collection to stateFormat = 2 - DocCollection stateV2 = new DocCollection("c1", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"); + DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"); ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2); writer.enqueueUpdate(reader.getClusterState(), c2, null); writer.writePendingUpdates(); @@ -125,12 +122,8 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { if (explicitRefresh) { reader.forceUpdateCollection("c1"); } else { - for (int i = 0; i < 1000; ++i) { - if (reader.getClusterState().getCollection("c1").getStateFormat() == 2) { - break; - } - Thread.sleep(50); - } + reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, + (n, c) -> c != null && c.getStateFormat() == 2); } DocCollection collection = reader.getClusterState().getCollection("c1"); @@ -166,7 +159,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { // create new collection with stateFormat = 2 ZkWriteCommand c1 = new ZkWriteCommand("c1", - new DocCollection("c1", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json")); + new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json")); writer.enqueueUpdate(reader.getClusterState(), c1, null); writer.writePendingUpdates(); reader.forceUpdateCollection("c1"); @@ -216,7 +209,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { // create new collection with stateFormat = 2 - DocCollection state = new DocCollection("c1", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json"); + DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json"); ZkWriteCommand wc = new ZkWriteCommand("c1", state); writer.enqueueUpdate(reader.getClusterState(), wc, null); writer.writePendingUpdates(); @@ -224,14 +217,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true)); //reader.forceUpdateCollection("c1"); - - for (int i = 0; i < 100; ++i) { - Thread.sleep(50); - ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1"); - if (ref != null) { - break; - } - } + reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null); ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1"); assertNotNull(ref); assertFalse(ref.isLazilyLoaded()); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java index 978bdc00edf..52079942d18 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java @@ -197,7 +197,7 @@ public class DocCollection extends ZkNodeProps implements Iterable { @Override public String toString() { - return "DocCollection("+name+")=" + JSONUtil.toJSON(this); + return "DocCollection("+name+"/" + znode + "/" + znodeVersion + ")=" + JSONUtil.toJSON(this); } @Override @@ -259,4 +259,26 @@ public class DocCollection extends ZkNodeProps implements Iterable { } return replicas; } + + /** + * Get the shardId of a core on a specific node + */ + public String getShardId(String nodeName, String coreName) { + for (Slice slice : this) { + for (Replica replica : slice) { + if (Objects.equals(replica.getNodeName(), nodeName) && Objects.equals(replica.getCoreName(), coreName)) + return slice.getName(); + } + } + return null; + } + + @Override + public boolean equals(Object that) { + if (that instanceof DocCollection == false) + return false; + DocCollection other = (DocCollection) that; + return super.equals(that) && Objects.equals(this.znode, other.znode) && this.znodeVersion == other.znodeVersion; + } + } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index fe3922c43e4..16813198320 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -23,6 +23,7 @@ import java.net.URLDecoder; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -265,16 +266,19 @@ public class ZkStateReader implements Closeable { return; } // No need to set watchers because we should already have watchers registered for everything. + refreshCollectionList(null); + refreshLiveNodes(null); refreshLegacyClusterState(null); // Need a copy so we don't delete from what we're iterating over. Collection safeCopy = new ArrayList<>(watchedCollectionStates.keySet()); + Map updatedCollections = new HashMap<>(); for (String coll : safeCopy) { DocCollection newState = fetchCollectionState(coll, null); - updateWatchedCollection(coll, newState); + if (updateWatchedCollection(coll, newState)) { + updatedCollections.put(coll, newState); + } } - refreshCollectionList(null); - refreshLiveNodes(null); - constructState(); + constructState(updatedCollections); } } @@ -297,12 +301,10 @@ public class ZkStateReader implements Closeable { if (!legacyCollectionStates.containsKey(collection)) { // No dice, see if a new collection just got created. LazyCollectionRef tryLazyCollection = new LazyCollectionRef(collection); - if (tryLazyCollection.get() == null) { - // No dice, just give up. - return; + if (tryLazyCollection.get() != null) { + // What do you know, it exists! + lazyCollectionStates.putIfAbsent(collection, tryLazyCollection); } - // What do you know, it exists! - lazyCollectionStates.putIfAbsent(collection, tryLazyCollection); } } else if (ref.isLazilyLoaded()) { if (ref.get() != null) { @@ -313,9 +315,10 @@ public class ZkStateReader implements Closeable { } else if (watchedCollectionStates.containsKey(collection)) { // Exists as a watched collection, force a refresh. DocCollection newState = fetchCollectionState(collection, null); - updateWatchedCollection(collection, newState); + if (updateWatchedCollection(collection, newState)) { + constructState(Collections.singletonMap(collection, newState)); + } } - constructState(); } } @@ -337,7 +340,11 @@ public class ZkStateReader implements Closeable { DocCollection nu = getCollectionLive(this, coll); if (nu == null) return -1 ; if (nu.getZNodeVersion() > collection.getZNodeVersion()) { - updateWatchedCollection(coll, nu); + if (updateWatchedCollection(coll, nu)) { + synchronized (getUpdateLock()) { + constructState(Collections.singletonMap(coll, nu)); + } + } collection = nu; } } @@ -371,7 +378,7 @@ public class ZkStateReader implements Closeable { refreshCollectionList(new CollectionsChildWatcher()); synchronized (ZkStateReader.this.getUpdateLock()) { - constructState(); + constructState(Collections.emptyMap()); zkClient.exists(ALIASES, new Watcher() { @@ -463,8 +470,14 @@ public class ZkStateReader implements Closeable { /** * Construct the total state view from all sources. * Must hold {@link #getUpdateLock()} before calling this. + * + * @param changedCollections collections that have changed since the last call, + * and that should fire notifications */ - private void constructState() { + private void constructState(Map changedCollections) { + + Set liveNodes = this.liveNodes; // volatile read + // Legacy clusterstate is authoritative, for backwards compatibility. // To move a collection's state to format2, first create the new state2 format node, then remove legacy entry. Map result = new LinkedHashMap<>(legacyCollectionStates); @@ -480,6 +493,7 @@ public class ZkStateReader implements Closeable { } this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion); + LOG.debug("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]", legacyCollectionStates.keySet().size(), collectionWatches.keySet().size(), @@ -495,6 +509,10 @@ public class ZkStateReader implements Closeable { lazyCollectionStates.keySet(), clusterState.getCollectionStates()); } + + for (Map.Entry entry : changedCollections.entrySet()) { + notifyStateWatchers(liveNodes, entry.getKey(), entry.getValue()); + } } /** @@ -510,30 +528,31 @@ public class ZkStateReader implements Closeable { // Nothing to do, someone else updated same or newer. return; } - Set liveNodes = this.liveNodes; // volatile read - for (Map.Entry watchEntry : this.collectionWatches.entrySet()) { - String coll = watchEntry.getKey(); - CollectionWatch collWatch = watchEntry.getValue(); + Map updatedCollections = new HashMap<>(); + for (String coll : this.collectionWatches.keySet()) { ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll); - if (ref == null) - continue; // legacy collections are always in-memory - DocCollection oldState = ref.get(); + DocCollection oldState = ref == null ? null : ref.get(); ClusterState.CollectionRef newRef = loadedData.getCollectionStates().get(coll); DocCollection newState = newRef == null ? null : newRef.get(); - if (!collWatch.stateWatchers.isEmpty() - && !Objects.equals(oldState, newState)) { - notifyStateWatchers(liveNodes, coll, newState); + if (newState == null) { + // check that we haven't just migrated + newState = watchedCollectionStates.get(coll); + } + if (!Objects.equals(oldState, newState)) { + updatedCollections.put(coll, newState); } } this.legacyCollectionStates = loadedData.getCollectionStates(); this.legacyClusterStateVersion = stat.getVersion(); + constructState(updatedCollections); } } catch (KeeperException.NoNodeException e) { // Ignore missing legacy clusterstate.json. synchronized (getUpdateLock()) { this.legacyCollectionStates = emptyMap(); this.legacyClusterStateVersion = 0; + constructState(Collections.emptyMap()); } } } @@ -555,7 +574,7 @@ public class ZkStateReader implements Closeable { * * A stateFormat=1 collection which is not interesting to us can also * be put into the {@link #lazyCollectionStates} map here. But that is okay - * because {@link #constructState()} will give priority to collections in the + * because {@link #constructState(Map)} will give priority to collections in the * shared collection state over this map. * In fact this is a clever way to avoid doing a ZK exists check on * the /collections/collection_name/state.json znode @@ -908,15 +927,11 @@ public class ZkStateReader implements Closeable { return; } - int liveNodesSize = ZkStateReader.this.clusterState == null ? 0 - : ZkStateReader.this.clusterState.getLiveNodes().size(); + Set liveNodes = ZkStateReader.this.liveNodes; LOG.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])", - event, coll, liveNodesSize); + event, coll, liveNodes.size()); refreshAndWatch(); - synchronized (getUpdateLock()) { - constructState(); - } } @@ -929,6 +944,10 @@ public class ZkStateReader implements Closeable { try { DocCollection newState = fetchCollectionState(coll, this); updateWatchedCollection(coll, newState); + synchronized (getUpdateLock()) { + constructState(Collections.singletonMap(coll, newState)); + } + } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { @@ -953,9 +972,6 @@ public class ZkStateReader implements Closeable { int liveNodesSize = ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size(); LOG.info("A cluster state change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodesSize); refreshAndWatch(); - synchronized (getUpdateLock()) { - constructState(); - } } /** Must hold {@link #getUpdateLock()} before calling this method. */ @@ -990,7 +1006,7 @@ public class ZkStateReader implements Closeable { LOG.info("A collections change: [{}], has occurred - updating...", event); refreshAndWatch(); synchronized (getUpdateLock()) { - constructState(); + constructState(Collections.emptyMap()); } } @@ -1109,9 +1125,6 @@ public class ZkStateReader implements Closeable { }); if (reconstructState.get()) { new StateWatcher(collection).refreshAndWatch(); - synchronized (getUpdateLock()) { - constructState(); - } } } @@ -1142,7 +1155,7 @@ public class ZkStateReader implements Closeable { }); if (reconstructState.get()) { synchronized (getUpdateLock()) { - constructState(); + constructState(Collections.emptyMap()); } } } @@ -1160,17 +1173,14 @@ public class ZkStateReader implements Closeable { v.stateWatchers.add(stateWatcher); return v; }); + if (watchSet.get()) { new StateWatcher(collection).refreshAndWatch(); - synchronized (getUpdateLock()) { - constructState(); - } } - else { - DocCollection state = clusterState.getCollectionOrNull(collection); - if (stateWatcher.onStateChanged(liveNodes, state) == true) { - removeCollectionStateWatcher(collection, stateWatcher); - } + + DocCollection state = clusterState.getCollectionOrNull(collection); + if (stateWatcher.onStateChanged(liveNodes, state) == true) { + removeCollectionStateWatcher(collection, stateWatcher); } } @@ -1240,17 +1250,15 @@ public class ZkStateReader implements Closeable { } // returns true if the state has changed - private void updateWatchedCollection(String coll, DocCollection newState) { - - Set liveNodes = this.liveNodes; // volatile read + private boolean updateWatchedCollection(String coll, DocCollection newState) { if (newState == null) { LOG.info("Deleting data for [{}]", coll); watchedCollectionStates.remove(coll); - notifyStateWatchers(liveNodes, coll, null); - return; + return true; } + boolean updated = false; // CAS update loop while (true) { if (!collectionWatches.containsKey(coll)) { @@ -1260,19 +1268,19 @@ public class ZkStateReader implements Closeable { if (oldState == null) { if (watchedCollectionStates.putIfAbsent(coll, newState) == null) { LOG.info("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion()); - notifyStateWatchers(liveNodes, coll, newState); + updated = true; break; } } else { if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) { // no change to state, but we might have been triggered by the addition of a // state watcher, so run notifications - notifyStateWatchers(liveNodes, coll, newState); + updated = true; break; } if (watchedCollectionStates.replace(coll, oldState, newState)) { LOG.info("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion()); - notifyStateWatchers(liveNodes, coll, newState); + updated = true; break; } } @@ -1284,6 +1292,7 @@ public class ZkStateReader implements Closeable { LOG.info("Removing uninteresting collection [{}]", coll); } + return updated; } public static class ConfigData {