SOLR-9181: Fix race in constructState() and missing call in forceUpdateCollection()

This commit is contained in:
Alan Woodward 2016-07-05 15:30:07 +01:00
parent 91a9d96454
commit be8d56ada6
1 changed files with 19 additions and 19 deletions

View File

@ -23,7 +23,6 @@ import java.net.URLDecoder;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
@ -271,11 +270,11 @@ public class ZkStateReader implements Closeable {
refreshLegacyClusterState(null); refreshLegacyClusterState(null);
// Need a copy so we don't delete from what we're iterating over. // Need a copy so we don't delete from what we're iterating over.
Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet()); Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
Map<String, DocCollection> updatedCollections = new HashMap<>(); Set<String> updatedCollections = new HashSet<>();
for (String coll : safeCopy) { for (String coll : safeCopy) {
DocCollection newState = fetchCollectionState(coll, null); DocCollection newState = fetchCollectionState(coll, null);
if (updateWatchedCollection(coll, newState)) { if (updateWatchedCollection(coll, newState)) {
updatedCollections.put(coll, newState); updatedCollections.add(coll);
} }
} }
constructState(updatedCollections); constructState(updatedCollections);
@ -305,7 +304,9 @@ public class ZkStateReader implements Closeable {
LazyCollectionRef tryLazyCollection = new LazyCollectionRef(collection); LazyCollectionRef tryLazyCollection = new LazyCollectionRef(collection);
if (tryLazyCollection.get() != null) { if (tryLazyCollection.get() != null) {
// What do you know, it exists! // What do you know, it exists!
LOG.info("Adding lazily-loaded reference for collection {}", collection);
lazyCollectionStates.putIfAbsent(collection, tryLazyCollection); lazyCollectionStates.putIfAbsent(collection, tryLazyCollection);
constructState(Collections.singleton(collection));
} }
} }
} else if (ref.isLazilyLoaded()) { } else if (ref.isLazilyLoaded()) {
@ -320,10 +321,9 @@ public class ZkStateReader implements Closeable {
LOG.info("Forcing refresh of watched collection state for {}", collection); LOG.info("Forcing refresh of watched collection state for {}", collection);
DocCollection newState = fetchCollectionState(collection, null); DocCollection newState = fetchCollectionState(collection, null);
if (updateWatchedCollection(collection, newState)) { if (updateWatchedCollection(collection, newState)) {
constructState(Collections.singletonMap(collection, newState)); constructState(Collections.singleton(collection));
} }
} } else {
else {
LOG.error("Collection {} is not lazy or watched!", collection); LOG.error("Collection {} is not lazy or watched!", collection);
} }
} }
@ -349,7 +349,7 @@ public class ZkStateReader implements Closeable {
if (nu.getZNodeVersion() > collection.getZNodeVersion()) { if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
if (updateWatchedCollection(coll, nu)) { if (updateWatchedCollection(coll, nu)) {
synchronized (getUpdateLock()) { synchronized (getUpdateLock()) {
constructState(Collections.singletonMap(coll, nu)); constructState(Collections.singleton(coll));
} }
} }
collection = nu; collection = nu;
@ -385,7 +385,7 @@ public class ZkStateReader implements Closeable {
refreshCollectionList(new CollectionsChildWatcher()); refreshCollectionList(new CollectionsChildWatcher());
synchronized (ZkStateReader.this.getUpdateLock()) { synchronized (ZkStateReader.this.getUpdateLock()) {
constructState(Collections.emptyMap()); constructState(Collections.emptySet());
zkClient.exists(ALIASES, zkClient.exists(ALIASES,
new Watcher() { new Watcher() {
@ -482,7 +482,7 @@ public class ZkStateReader implements Closeable {
* @param changedCollections collections that have changed since the last call, * @param changedCollections collections that have changed since the last call,
* and that should fire notifications * and that should fire notifications
*/ */
private void constructState(Map<String, DocCollection> changedCollections) { private void constructState(Set<String> changedCollections) {
Set<String> liveNodes = this.liveNodes; // volatile read Set<String> liveNodes = this.liveNodes; // volatile read
@ -518,9 +518,10 @@ public class ZkStateReader implements Closeable {
clusterState.getCollectionStates()); clusterState.getCollectionStates());
} }
for (Map.Entry<String, DocCollection> entry : changedCollections.entrySet()) { for (String collection : changedCollections) {
notifyStateWatchers(liveNodes, entry.getKey(), entry.getValue()); notifyStateWatchers(liveNodes, collection, clusterState.getCollectionOrNull(collection));
} }
} }
/** /**
@ -536,7 +537,7 @@ public class ZkStateReader implements Closeable {
// Nothing to do, someone else updated same or newer. // Nothing to do, someone else updated same or newer.
return; return;
} }
Map<String, DocCollection> updatedCollections = new HashMap<>(); Set<String> updatedCollections = new HashSet<>();
for (String coll : this.collectionWatches.keySet()) { for (String coll : this.collectionWatches.keySet()) {
ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll); ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll);
// legacy collections are always in-memory // legacy collections are always in-memory
@ -548,7 +549,7 @@ public class ZkStateReader implements Closeable {
newState = watchedCollectionStates.get(coll); newState = watchedCollectionStates.get(coll);
} }
if (!Objects.equals(oldState, newState)) { if (!Objects.equals(oldState, newState)) {
updatedCollections.put(coll, newState); updatedCollections.add(coll);
} }
} }
this.legacyCollectionStates = loadedData.getCollectionStates(); this.legacyCollectionStates = loadedData.getCollectionStates();
@ -560,7 +561,7 @@ public class ZkStateReader implements Closeable {
synchronized (getUpdateLock()) { synchronized (getUpdateLock()) {
this.legacyCollectionStates = emptyMap(); this.legacyCollectionStates = emptyMap();
this.legacyClusterStateVersion = 0; this.legacyClusterStateVersion = 0;
constructState(Collections.emptyMap()); constructState(Collections.emptySet());
} }
} }
} }
@ -582,7 +583,7 @@ public class ZkStateReader implements Closeable {
* *
* A stateFormat=1 collection which is not interesting to us can also * A stateFormat=1 collection which is not interesting to us can also
* be put into the {@link #lazyCollectionStates} map here. But that is okay * be put into the {@link #lazyCollectionStates} map here. But that is okay
* because {@link #constructState(Map)} will give priority to collections in the * because {@link #constructState(Set)} will give priority to collections in the
* shared collection state over this map. * shared collection state over this map.
* In fact this is a clever way to avoid doing a ZK exists check on * In fact this is a clever way to avoid doing a ZK exists check on
* the /collections/collection_name/state.json znode * the /collections/collection_name/state.json znode
@ -615,7 +616,6 @@ public class ZkStateReader implements Closeable {
// Double check contains just to avoid allocating an object. // Double check contains just to avoid allocating an object.
LazyCollectionRef existing = lazyCollectionStates.get(coll); LazyCollectionRef existing = lazyCollectionStates.get(coll);
if (existing == null) { if (existing == null) {
LOG.info("Adding lazy collectionRef for collection {}", coll);
lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll)); lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
} }
} }
@ -954,7 +954,7 @@ public class ZkStateReader implements Closeable {
DocCollection newState = fetchCollectionState(coll, this); DocCollection newState = fetchCollectionState(coll, this);
updateWatchedCollection(coll, newState); updateWatchedCollection(coll, newState);
synchronized (getUpdateLock()) { synchronized (getUpdateLock()) {
constructState(Collections.singletonMap(coll, newState)); constructState(Collections.singleton(coll));
} }
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
@ -1015,7 +1015,7 @@ public class ZkStateReader implements Closeable {
LOG.info("A collections change: [{}], has occurred - updating...", event); LOG.info("A collections change: [{}], has occurred - updating...", event);
refreshAndWatch(); refreshAndWatch();
synchronized (getUpdateLock()) { synchronized (getUpdateLock()) {
constructState(Collections.emptyMap()); constructState(Collections.emptySet());
} }
} }
@ -1164,7 +1164,7 @@ public class ZkStateReader implements Closeable {
}); });
if (reconstructState.get()) { if (reconstructState.get()) {
synchronized (getUpdateLock()) { synchronized (getUpdateLock()) {
constructState(Collections.emptyMap()); constructState(Collections.emptySet());
} }
} }
} }