mirror of https://github.com/apache/lucene.git
SOLR-13418 - safer synchronization and zk version checking for collection properties
This commit is contained in:
parent
2261b4e8e1
commit
80d3ac8709
|
@ -167,7 +167,7 @@ public class ZkStateReader implements SolrCloseable {
|
||||||
private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
/** Collection properties being actively watched */
|
/** Collection properties being actively watched */
|
||||||
private final ConcurrentHashMap<String, Map<String, String>> watchedCollectionProps = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, VersionedCollectionProps> watchedCollectionProps = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private volatile SortedSet<String> liveNodes = emptySortedSet();
|
private volatile SortedSet<String> liveNodes = emptySortedSet();
|
||||||
|
|
||||||
|
@ -1074,17 +1074,30 @@ public class ZkStateReader implements SolrCloseable {
|
||||||
* otherwise fetch it directly from zookeeper.
|
* otherwise fetch it directly from zookeeper.
|
||||||
*/
|
*/
|
||||||
public Map<String, String> getCollectionProperties(final String collection) {
|
public Map<String, String> getCollectionProperties(final String collection) {
|
||||||
Map<String, String> properties = watchedCollectionProps.get(collection);
|
synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
|
||||||
if (properties == null) {
|
VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
|
||||||
try {
|
Map<String, String> properties = vprops != null ? vprops.props : null;
|
||||||
properties = fetchCollectionProperties(collection, null);
|
if (properties == null) {
|
||||||
// Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set.
|
try {
|
||||||
} catch (Exception e) {
|
// todo: maybe we want to store/watch since if someone's calling this it's probably going to get called again?
|
||||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
|
// Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set.
|
||||||
|
properties = fetchCollectionProperties(collection, null ).props;
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class VersionedCollectionProps {
|
||||||
|
public VersionedCollectionProps(int zkVersion, Map<String, String> props) {
|
||||||
|
this.zkVersion = zkVersion;
|
||||||
|
this.props = props;
|
||||||
}
|
}
|
||||||
|
|
||||||
return properties;
|
int zkVersion;
|
||||||
|
Map<String,String> props;
|
||||||
}
|
}
|
||||||
|
|
||||||
static String getCollectionPropsPath(final String collection) {
|
static String getCollectionPropsPath(final String collection) {
|
||||||
|
@ -1092,12 +1105,13 @@ public class ZkStateReader implements SolrCloseable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private Map<String, String> fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
|
private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
|
||||||
final String znodePath = getCollectionPropsPath(collection);
|
final String znodePath = getCollectionPropsPath(collection);
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
byte[] data = zkClient.getData(znodePath, watcher, null, true);
|
Stat stat = new Stat();
|
||||||
return (Map<String, String>) Utils.fromJSON(data);
|
byte[] data = zkClient.getData(znodePath, watcher, stat, true);
|
||||||
|
return new VersionedCollectionProps(stat.getVersion(),(Map<String, String>) Utils.fromJSON(data));
|
||||||
} catch (ClassCastException e) {
|
} catch (ClassCastException e) {
|
||||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
|
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
|
||||||
} catch (KeeperException.NoNodeException e) {
|
} catch (KeeperException.NoNodeException e) {
|
||||||
|
@ -1110,7 +1124,7 @@ public class ZkStateReader implements SolrCloseable {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Collections.emptyMap();
|
return new VersionedCollectionProps(-1, EMPTY_MAP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1272,18 +1286,17 @@ public class ZkStateReader implements SolrCloseable {
|
||||||
*/
|
*/
|
||||||
void refreshAndWatch(boolean notifyWatchers) {
|
void refreshAndWatch(boolean notifyWatchers) {
|
||||||
try {
|
try {
|
||||||
synchronized (coll) { // We only have one PropsWatcher instance per collection, so it's fine to sync on coll
|
synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
|
||||||
Map<String, String> properties = fetchCollectionProperties(coll, this);
|
VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
|
||||||
watchedCollectionProps.put(coll, properties);
|
Map<String, String> properties = vcp.props;
|
||||||
/*
|
VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll);
|
||||||
* Note that if two events were fired close to each other and the second one arrived first, we would read the collectionprops.json
|
if (existingVcp == null || // never called before, record what we found
|
||||||
* twice for the same data and notify watchers (in case of notifyWatchers==true) twice for the same data, however it's guaranteed
|
vcp.zkVersion > existingVcp.zkVersion || // newer info we should update
|
||||||
* that after processing both events, watchedCollectionProps will have the latest data written to ZooKeeper and that the watchers
|
vcp.zkVersion == -1) { // node was deleted start over
|
||||||
* won't be called with out of order data
|
watchedCollectionProps.put(coll, vcp);
|
||||||
*
|
if (notifyWatchers) {
|
||||||
*/
|
notifyPropsWatchers(coll, properties);
|
||||||
if (notifyWatchers) {
|
}
|
||||||
notifyPropsWatchers(coll, properties);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
|
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
|
||||||
|
@ -1692,7 +1705,10 @@ public class ZkStateReader implements SolrCloseable {
|
||||||
return null;
|
return null;
|
||||||
v.stateWatchers.remove(watcher);
|
v.stateWatchers.remove(watcher);
|
||||||
if (v.canBeRemoved()) {
|
if (v.canBeRemoved()) {
|
||||||
watchedCollectionProps.remove(collection);
|
// don't want this to happen in middle of other blocks that might add it back.
|
||||||
|
synchronized (watchedCollectionProps) {
|
||||||
|
watchedCollectionProps.remove(collection);
|
||||||
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return v;
|
return v;
|
||||||
|
|
Loading…
Reference in New Issue