mirror of https://github.com/apache/lucene.git
SOLR-5756: Fix for race condition between unwatch and watcher fire event
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1696335 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a11036826b
commit
3109c453bb
|
@ -21,6 +21,7 @@ import java.io.Closeable;
|
|||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLDecoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -254,7 +255,9 @@ public class ZkStateReader implements Closeable {
|
|||
}
|
||||
// No need to set watchers because we should already have watchers registered for everything.
|
||||
refreshLegacyClusterState(null);
|
||||
for (String coll : watchedCollectionStates.keySet()) {
|
||||
// Need a copy so we don't delete from what we're iterating over.
|
||||
Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
|
||||
for (String coll : safeCopy) {
|
||||
DocCollection newState = fetchCollectionState(coll, null);
|
||||
updateWatchedCollection(coll, newState);
|
||||
}
|
||||
|
@ -450,6 +453,13 @@ public class ZkStateReader implements Closeable {
|
|||
}
|
||||
|
||||
this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);
|
||||
log.debug("clusterStateSet: version {} legacy {} interesting {} watched {} lazy {} total {}",
|
||||
clusterState.getZkClusterStateVersion(),
|
||||
legacyCollectionStates.keySet(),
|
||||
interestingCollections,
|
||||
watchedCollectionStates.keySet(),
|
||||
lazyCollectionStates.keySet(),
|
||||
clusterState.getCollections());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1023,34 +1033,47 @@ public class ZkStateReader implements Closeable {
|
|||
return;
|
||||
}
|
||||
|
||||
log.info("Updating data for {} to ver {} ", coll, newState.getZNodeVersion());
|
||||
// CAS update loop
|
||||
while (true) {
|
||||
if (!interestingCollections.contains(coll)) {
|
||||
break;
|
||||
}
|
||||
DocCollection oldState = watchedCollectionStates.get(coll);
|
||||
if (oldState == null) {
|
||||
if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
|
||||
log.info("Add data for {} ver {} ", coll, newState.getZNodeVersion());
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
|
||||
// Nothing to do, someone else updated same or newer.
|
||||
return;
|
||||
break;
|
||||
}
|
||||
if (watchedCollectionStates.replace(coll, oldState, newState)) {
|
||||
log.info("Updating data for {} from {} to {} ", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve race with removeZKWatch.
|
||||
if (!interestingCollections.contains(coll)) {
|
||||
watchedCollectionStates.remove(coll);
|
||||
log.info("Removing uninteresting collection {}", coll);
|
||||
}
|
||||
}
|
||||
|
||||
/** This is not a public API. Only used by ZkController */
|
||||
public void removeZKWatch(String coll) {
|
||||
log.info("Removing watch for uninteresting collection {}", coll);
|
||||
interestingCollections.remove(coll);
|
||||
watchedCollectionStates.remove(coll);
|
||||
ClusterState.CollectionRef lazyCollectionStateFormat2 = tryMakeLazyCollectionStateFormat2(coll);
|
||||
synchronized (getUpdateLock()) {
|
||||
if (lazyCollectionStateFormat2 != null) {
|
||||
this.lazyCollectionStates.put(coll, lazyCollectionStateFormat2);
|
||||
} else {
|
||||
this.lazyCollectionStates.remove(coll);
|
||||
}
|
||||
constructState();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue