diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index cd7e41c40bf..2e98189a790 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -77,7 +77,7 @@ import static org.apache.solr.common.util.Utils.fromJSON; public class ZkStateReader implements SolrCloseable { public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000); // delay between cloud state updates private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - + public static final String BASE_URL_PROP = "base_url"; public static final String NODE_NAME_PROP = "node_name"; public static final String CORE_NODE_NAME_PROP = "core_node_name"; @@ -140,7 +140,7 @@ public class ZkStateReader implements SolrCloseable { public static final String COLLECTION_DEF = "collectionDefaults"; public static final String URL_SCHEME = "urlScheme"; - + public static final String REPLICA_TYPE = "type"; /** A view of the current state of all collections; combines all the different state sources into a single view. */ @@ -167,7 +167,7 @@ public class ZkStateReader implements SolrCloseable { private final ConcurrentHashMap lazyCollectionStates = new ConcurrentHashMap<>(); /** Collection properties being actively watched */ - private final ConcurrentHashMap> watchedCollectionProps = new ConcurrentHashMap<>(); + private final ConcurrentHashMap watchedCollectionProps = new ConcurrentHashMap<>(); private volatile SortedSet liveNodes = emptySortedSet(); @@ -188,7 +188,7 @@ public class ZkStateReader implements SolrCloseable { private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches"); private Set liveNodesListeners = ConcurrentHashMap.newKeySet(); - + /** Used to submit notifications to Collection Properties watchers in order **/ private final ExecutorService collectionPropsNotifications = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("collectionPropsNotifications")); @@ -285,11 +285,11 @@ public class ZkStateReader implements SolrCloseable { private final SolrZkClient zkClient; - + private final boolean closeClient; private volatile boolean closed = false; - + private Set waitLatches = ConcurrentHashMap.newKeySet(); public ZkStateReader(SolrZkClient zkClient) { @@ -327,7 +327,7 @@ public class ZkStateReader implements SolrCloseable { this.configManager = new ZkConfigManager(zkClient); this.closeClient = true; this.securityNodeListener = null; - + assert ObjectReleaseTracker.track(this); } @@ -339,7 +339,7 @@ public class ZkStateReader implements SolrCloseable { * Forcibly refresh cluster state from ZK. Do this only to avoid race conditions because it's expensive. * * It is cheaper to call {@link #forceUpdateCollection(String)} on a single collection if you must. - * + * * @lucene.internal */ public void forciblyRefreshAllClusterStateSlow() throws KeeperException, InterruptedException { @@ -437,16 +437,16 @@ public class ZkStateReader implements SolrCloseable { collection = nu; } } - + if (collection.getZNodeVersion() == version) { return null; } - + log.debug("Wrong version from client [{}]!=[{}]", version, collection.getZNodeVersion()); - + return collection.getZNodeVersion(); } - + public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException, InterruptedException { // We need to fetch the current cluster state and the set of live nodes @@ -818,7 +818,7 @@ public class ZkStateReader implements SolrCloseable { if (listener.onChange(new TreeSet<>(getClusterState().getLiveNodes()), new TreeSet<>(getClusterState().getLiveNodes()))) { removeLiveNodesListener(listener); } - + liveNodesListeners.add(listener); } @@ -832,18 +832,18 @@ public class ZkStateReader implements SolrCloseable { public ClusterState getClusterState() { return clusterState; } - + public Object getUpdateLock() { return this; } public void close() { this.closed = true; - + notifications.shutdownNow(); - + waitLatches.parallelStream().forEach(c -> { c.countDown(); }); - + ExecutorUtil.shutdownAndAwaitTermination(notifications); ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications); if (closeClient) { @@ -856,12 +856,12 @@ public class ZkStateReader implements SolrCloseable { public boolean isClosed() { return closed; } - + public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException { ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, shard, timeout)); return props.getCoreUrl(); } - + public Replica getLeader(Set liveNodes, DocCollection docCollection, String shard) { Replica replica = docCollection != null ? docCollection.getLeader(shard) : null; if (replica != null && liveNodes.contains(replica.getNodeName())) { @@ -934,18 +934,18 @@ public class ZkStateReader implements SolrCloseable { public List getReplicaProps(String collection, String shardId, String thisCoreNodeName) { return getReplicaProps(collection, shardId, thisCoreNodeName, null); } - + public List getReplicaProps(String collection, String shardId, String thisCoreNodeName, Replica.State mustMatchStateFilter) { return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null); } - + public List getReplicaProps(String collection, String shardId, String thisCoreNodeName, Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter) { //TODO: We don't need all these getReplicaProps method overloading. Also, it's odd that the default is to return replicas of type TLOG and NRT only return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)); } - + public List getReplicaProps(String collection, String shardId, String thisCoreNodeName, Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter, final EnumSet acceptReplicaType) { assert thisCoreNodeName != null; @@ -958,20 +958,20 @@ public class ZkStateReader implements SolrCloseable { throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find collection in zk: " + collection); } - + Map slices = docCollection.getSlicesMap(); Slice replicas = slices.get(shardId); if (replicas == null) { throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId); } - + Map shardMap = replicas.getReplicasMap(); List nodes = new ArrayList<>(shardMap.size()); for (Entry entry : shardMap.entrySet().stream().filter((e)->acceptReplicaType.contains(e.getValue().getType())).collect(Collectors.toList())) { ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue()); - + String coreNodeName = entry.getValue().getName(); - + if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) { if (mustMatchStateFilter == null || mustMatchStateFilter == Replica.State.getState(nodeProps.getState())) { if (mustNotMatchStateFilter == null || mustNotMatchStateFilter != Replica.State.getState(nodeProps.getState())) { @@ -1074,17 +1074,30 @@ public class ZkStateReader implements SolrCloseable { * otherwise fetch it directly from zookeeper. */ public Map getCollectionProperties(final String collection) { - Map properties = watchedCollectionProps.get(collection); - if (properties == null) { - try { - properties = fetchCollectionProperties(collection, null); - // Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set. - } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e)); + synchronized (watchedCollectionProps) { // making decisions based on the result of a get... + VersionedCollectionProps vprops = watchedCollectionProps.get(collection); + Map properties = vprops != null ? vprops.props : null; + if (properties == null) { + try { + // todo: maybe we want to store/watch since if someone's calling this it's probably going to get called again? + // Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set. + properties = fetchCollectionProperties(collection, null ).props; + } catch (Exception e) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e)); + } } + return properties; + } + } + + private class VersionedCollectionProps { + public VersionedCollectionProps(int zkVersion, Map props) { + this.zkVersion = zkVersion; + this.props = props; } - return properties; + int zkVersion; + Map props; } static String getCollectionPropsPath(final String collection) { @@ -1092,12 +1105,13 @@ public class ZkStateReader implements SolrCloseable { } @SuppressWarnings("unchecked") - private Map fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException { + private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException { final String znodePath = getCollectionPropsPath(collection); while (true) { try { - byte[] data = zkClient.getData(znodePath, watcher, null, true); - return (Map) Utils.fromJSON(data); + Stat stat = new Stat(); + byte[] data = zkClient.getData(znodePath, watcher, stat, true); + return new VersionedCollectionProps(stat.getVersion(),(Map) Utils.fromJSON(data)); } catch (ClassCastException e) { throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e); } catch (KeeperException.NoNodeException e) { @@ -1110,7 +1124,7 @@ public class ZkStateReader implements SolrCloseable { continue; } } - return Collections.emptyMap(); + return new VersionedCollectionProps(-1, EMPTY_MAP); } } } @@ -1272,18 +1286,17 @@ public class ZkStateReader implements SolrCloseable { */ void refreshAndWatch(boolean notifyWatchers) { try { - synchronized (coll) { // We only have one PropsWatcher instance per collection, so it's fine to sync on coll - Map properties = fetchCollectionProperties(coll, this); - watchedCollectionProps.put(coll, properties); - /* - * Note that if two events were fired close to each other and the second one arrived first, we would read the collectionprops.json - * twice for the same data and notify watchers (in case of notifyWatchers==true) twice for the same data, however it's guaranteed - * that after processing both events, watchedCollectionProps will have the latest data written to ZooKeeper and that the watchers - * won't be called with out of order data - * - */ - if (notifyWatchers) { - notifyPropsWatchers(coll, properties); + synchronized (watchedCollectionProps) { // making decisions based on the result of a get... + VersionedCollectionProps vcp = fetchCollectionProperties(coll, this); + Map properties = vcp.props; + VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll); + if (existingVcp == null || // never called before, record what we found + vcp.zkVersion > existingVcp.zkVersion || // newer info we should update + vcp.zkVersion == -1) { // node was deleted start over + watchedCollectionProps.put(coll, vcp); + if (notifyWatchers) { + notifyPropsWatchers(coll, properties); + } } } } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { @@ -1306,7 +1319,7 @@ public class ZkStateReader implements SolrCloseable { if (ZkStateReader.this.closed) { return; } - + // session events are not change events, and do not remove the watcher if (EventType.None.equals(event.getType())) { return; @@ -1507,11 +1520,11 @@ public class ZkStateReader implements SolrCloseable { */ public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate) throws InterruptedException, TimeoutException { - + if (closed) { throw new AlreadyClosedException(); } - + final CountDownLatch latch = new CountDownLatch(1); waitLatches.add(latch); AtomicReference docCollection = new AtomicReference<>(); @@ -1520,7 +1533,7 @@ public class ZkStateReader implements SolrCloseable { boolean matches = predicate.matches(n, c); if (matches) latch.countDown(); - + return matches; }; registerCollectionStateWatcher(collection, watcher); @@ -1551,22 +1564,22 @@ public class ZkStateReader implements SolrCloseable { */ public void waitForLiveNodes(long wait, TimeUnit unit, LiveNodesPredicate predicate) throws InterruptedException, TimeoutException { - + if (closed) { throw new AlreadyClosedException(); } - + final CountDownLatch latch = new CountDownLatch(1); waitLatches.add(latch); - + LiveNodesListener listener = (o, n) -> { boolean matches = predicate.matches(o, n); if (matches) latch.countDown(); return matches; }; - + registerLiveNodesListener(listener); try { @@ -1581,7 +1594,7 @@ public class ZkStateReader implements SolrCloseable { } } - + /** * Remove a watcher from a collection's watch list. * @@ -1692,7 +1705,10 @@ public class ZkStateReader implements SolrCloseable { return null; v.stateWatchers.remove(watcher); if (v.canBeRemoved()) { - watchedCollectionProps.remove(collection); + // don't want this to happen in middle of other blocks that might add it back. + synchronized (watchedCollectionProps) { + watchedCollectionProps.remove(collection); + } return null; } return v;