mirror of https://github.com/apache/lucene.git
SOLR-13418 - safer synchronization and zk version checking for collection properties
(cherry picked from commit 80d3ac8709
)
This commit is contained in:
parent
e02f25d975
commit
0cfd85baef
|
@ -77,7 +77,7 @@ import static org.apache.solr.common.util.Utils.fromJSON;
|
|||
public class ZkStateReader implements SolrCloseable {
|
||||
public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000); // delay between cloud state updates
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
|
||||
public static final String BASE_URL_PROP = "base_url";
|
||||
public static final String NODE_NAME_PROP = "node_name";
|
||||
public static final String CORE_NODE_NAME_PROP = "core_node_name";
|
||||
|
@ -140,7 +140,7 @@ public class ZkStateReader implements SolrCloseable {
|
|||
public static final String COLLECTION_DEF = "collectionDefaults";
|
||||
|
||||
public static final String URL_SCHEME = "urlScheme";
|
||||
|
||||
|
||||
public static final String REPLICA_TYPE = "type";
|
||||
|
||||
/** A view of the current state of all collections; combines all the different state sources into a single view. */
|
||||
|
@ -167,7 +167,7 @@ public class ZkStateReader implements SolrCloseable {
|
|||
private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<>();
|
||||
|
||||
/** Collection properties being actively watched */
|
||||
private final ConcurrentHashMap<String, Map<String, String>> watchedCollectionProps = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, VersionedCollectionProps> watchedCollectionProps = new ConcurrentHashMap<>();
|
||||
|
||||
private volatile SortedSet<String> liveNodes = emptySortedSet();
|
||||
|
||||
|
@ -188,7 +188,7 @@ public class ZkStateReader implements SolrCloseable {
|
|||
private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
|
||||
|
||||
private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
|
||||
|
||||
|
||||
/** Used to submit notifications to Collection Properties watchers in order **/
|
||||
private final ExecutorService collectionPropsNotifications = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("collectionPropsNotifications"));
|
||||
|
||||
|
@ -285,11 +285,11 @@ public class ZkStateReader implements SolrCloseable {
|
|||
|
||||
|
||||
private final SolrZkClient zkClient;
|
||||
|
||||
|
||||
private final boolean closeClient;
|
||||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
|
||||
private Set<CountDownLatch> waitLatches = ConcurrentHashMap.newKeySet();
|
||||
|
||||
public ZkStateReader(SolrZkClient zkClient) {
|
||||
|
@ -327,7 +327,7 @@ public class ZkStateReader implements SolrCloseable {
|
|||
this.configManager = new ZkConfigManager(zkClient);
|
||||
this.closeClient = true;
|
||||
this.securityNodeListener = null;
|
||||
|
||||
|
||||
assert ObjectReleaseTracker.track(this);
|
||||
}
|
||||
|
||||
|
@ -339,7 +339,7 @@ public class ZkStateReader implements SolrCloseable {
|
|||
* Forcibly refresh cluster state from ZK. Do this only to avoid race conditions because it's expensive.
|
||||
*
|
||||
* It is cheaper to call {@link #forceUpdateCollection(String)} on a single collection if you must.
|
||||
*
|
||||
*
|
||||
* @lucene.internal
|
||||
*/
|
||||
public void forciblyRefreshAllClusterStateSlow() throws KeeperException, InterruptedException {
|
||||
|
@ -437,16 +437,16 @@ public class ZkStateReader implements SolrCloseable {
|
|||
collection = nu;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (collection.getZNodeVersion() == version) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
log.debug("Wrong version from client [{}]!=[{}]", version, collection.getZNodeVersion());
|
||||
|
||||
|
||||
return collection.getZNodeVersion();
|
||||
}
|
||||
|
||||
|
||||
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
|
||||
InterruptedException {
|
||||
// We need to fetch the current cluster state and the set of live nodes
|
||||
|
@ -818,7 +818,7 @@ public class ZkStateReader implements SolrCloseable {
|
|||
if (listener.onChange(new TreeSet<>(getClusterState().getLiveNodes()), new TreeSet<>(getClusterState().getLiveNodes()))) {
|
||||
removeLiveNodesListener(listener);
|
||||
}
|
||||
|
||||
|
||||
liveNodesListeners.add(listener);
|
||||
}
|
||||
|
||||
|
@ -832,18 +832,18 @@ public class ZkStateReader implements SolrCloseable {
|
|||
public ClusterState getClusterState() {
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
|
||||
public Object getUpdateLock() {
|
||||
return this;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
this.closed = true;
|
||||
|
||||
|
||||
notifications.shutdownNow();
|
||||
|
||||
|
||||
waitLatches.parallelStream().forEach(c -> { c.countDown(); });
|
||||
|
||||
|
||||
ExecutorUtil.shutdownAndAwaitTermination(notifications);
|
||||
ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications);
|
||||
if (closeClient) {
|
||||
|
@ -856,12 +856,12 @@ public class ZkStateReader implements SolrCloseable {
|
|||
public boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
|
||||
|
||||
public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException {
|
||||
ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, shard, timeout));
|
||||
return props.getCoreUrl();
|
||||
}
|
||||
|
||||
|
||||
public Replica getLeader(Set<String> liveNodes, DocCollection docCollection, String shard) {
|
||||
Replica replica = docCollection != null ? docCollection.getLeader(shard) : null;
|
||||
if (replica != null && liveNodes.contains(replica.getNodeName())) {
|
||||
|
@ -934,18 +934,18 @@ public class ZkStateReader implements SolrCloseable {
|
|||
public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName) {
|
||||
return getReplicaProps(collection, shardId, thisCoreNodeName, null);
|
||||
}
|
||||
|
||||
|
||||
public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
|
||||
Replica.State mustMatchStateFilter) {
|
||||
return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null);
|
||||
}
|
||||
|
||||
|
||||
public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
|
||||
Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter) {
|
||||
//TODO: We don't need all these getReplicaProps method overloading. Also, it's odd that the default is to return replicas of type TLOG and NRT only
|
||||
return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT));
|
||||
}
|
||||
|
||||
|
||||
public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
|
||||
Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter, final EnumSet<Replica.Type> acceptReplicaType) {
|
||||
assert thisCoreNodeName != null;
|
||||
|
@ -958,20 +958,20 @@ public class ZkStateReader implements SolrCloseable {
|
|||
throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
|
||||
"Could not find collection in zk: " + collection);
|
||||
}
|
||||
|
||||
|
||||
Map<String,Slice> slices = docCollection.getSlicesMap();
|
||||
Slice replicas = slices.get(shardId);
|
||||
if (replicas == null) {
|
||||
throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId);
|
||||
}
|
||||
|
||||
|
||||
Map<String,Replica> shardMap = replicas.getReplicasMap();
|
||||
List<ZkCoreNodeProps> nodes = new ArrayList<>(shardMap.size());
|
||||
for (Entry<String,Replica> entry : shardMap.entrySet().stream().filter((e)->acceptReplicaType.contains(e.getValue().getType())).collect(Collectors.toList())) {
|
||||
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
|
||||
|
||||
|
||||
String coreNodeName = entry.getValue().getName();
|
||||
|
||||
|
||||
if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) {
|
||||
if (mustMatchStateFilter == null || mustMatchStateFilter == Replica.State.getState(nodeProps.getState())) {
|
||||
if (mustNotMatchStateFilter == null || mustNotMatchStateFilter != Replica.State.getState(nodeProps.getState())) {
|
||||
|
@ -1074,17 +1074,30 @@ public class ZkStateReader implements SolrCloseable {
|
|||
* otherwise fetch it directly from zookeeper.
|
||||
*/
|
||||
public Map<String, String> getCollectionProperties(final String collection) {
|
||||
Map<String, String> properties = watchedCollectionProps.get(collection);
|
||||
if (properties == null) {
|
||||
try {
|
||||
properties = fetchCollectionProperties(collection, null);
|
||||
// Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set.
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
|
||||
synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
|
||||
VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
|
||||
Map<String, String> properties = vprops != null ? vprops.props : null;
|
||||
if (properties == null) {
|
||||
try {
|
||||
// todo: maybe we want to store/watch since if someone's calling this it's probably going to get called again?
|
||||
// Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set.
|
||||
properties = fetchCollectionProperties(collection, null ).props;
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
|
||||
}
|
||||
}
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
|
||||
private class VersionedCollectionProps {
|
||||
public VersionedCollectionProps(int zkVersion, Map<String, String> props) {
|
||||
this.zkVersion = zkVersion;
|
||||
this.props = props;
|
||||
}
|
||||
|
||||
return properties;
|
||||
int zkVersion;
|
||||
Map<String,String> props;
|
||||
}
|
||||
|
||||
static String getCollectionPropsPath(final String collection) {
|
||||
|
@ -1092,12 +1105,13 @@ public class ZkStateReader implements SolrCloseable {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<String, String> fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
|
||||
private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
|
||||
final String znodePath = getCollectionPropsPath(collection);
|
||||
while (true) {
|
||||
try {
|
||||
byte[] data = zkClient.getData(znodePath, watcher, null, true);
|
||||
return (Map<String, String>) Utils.fromJSON(data);
|
||||
Stat stat = new Stat();
|
||||
byte[] data = zkClient.getData(znodePath, watcher, stat, true);
|
||||
return new VersionedCollectionProps(stat.getVersion(),(Map<String, String>) Utils.fromJSON(data));
|
||||
} catch (ClassCastException e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
|
@ -1110,7 +1124,7 @@ public class ZkStateReader implements SolrCloseable {
|
|||
continue;
|
||||
}
|
||||
}
|
||||
return Collections.emptyMap();
|
||||
return new VersionedCollectionProps(-1, EMPTY_MAP);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1272,18 +1286,17 @@ public class ZkStateReader implements SolrCloseable {
|
|||
*/
|
||||
void refreshAndWatch(boolean notifyWatchers) {
|
||||
try {
|
||||
synchronized (coll) { // We only have one PropsWatcher instance per collection, so it's fine to sync on coll
|
||||
Map<String, String> properties = fetchCollectionProperties(coll, this);
|
||||
watchedCollectionProps.put(coll, properties);
|
||||
/*
|
||||
* Note that if two events were fired close to each other and the second one arrived first, we would read the collectionprops.json
|
||||
* twice for the same data and notify watchers (in case of notifyWatchers==true) twice for the same data, however it's guaranteed
|
||||
* that after processing both events, watchedCollectionProps will have the latest data written to ZooKeeper and that the watchers
|
||||
* won't be called with out of order data
|
||||
*
|
||||
*/
|
||||
if (notifyWatchers) {
|
||||
notifyPropsWatchers(coll, properties);
|
||||
synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
|
||||
VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
|
||||
Map<String, String> properties = vcp.props;
|
||||
VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll);
|
||||
if (existingVcp == null || // never called before, record what we found
|
||||
vcp.zkVersion > existingVcp.zkVersion || // newer info we should update
|
||||
vcp.zkVersion == -1) { // node was deleted start over
|
||||
watchedCollectionProps.put(coll, vcp);
|
||||
if (notifyWatchers) {
|
||||
notifyPropsWatchers(coll, properties);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
|
||||
|
@ -1306,7 +1319,7 @@ public class ZkStateReader implements SolrCloseable {
|
|||
if (ZkStateReader.this.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// session events are not change events, and do not remove the watcher
|
||||
if (EventType.None.equals(event.getType())) {
|
||||
return;
|
||||
|
@ -1507,11 +1520,11 @@ public class ZkStateReader implements SolrCloseable {
|
|||
*/
|
||||
public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
|
||||
throws InterruptedException, TimeoutException {
|
||||
|
||||
|
||||
if (closed) {
|
||||
throw new AlreadyClosedException();
|
||||
}
|
||||
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
waitLatches.add(latch);
|
||||
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
|
||||
|
@ -1520,7 +1533,7 @@ public class ZkStateReader implements SolrCloseable {
|
|||
boolean matches = predicate.matches(n, c);
|
||||
if (matches)
|
||||
latch.countDown();
|
||||
|
||||
|
||||
return matches;
|
||||
};
|
||||
registerCollectionStateWatcher(collection, watcher);
|
||||
|
@ -1551,22 +1564,22 @@ public class ZkStateReader implements SolrCloseable {
|
|||
*/
|
||||
public void waitForLiveNodes(long wait, TimeUnit unit, LiveNodesPredicate predicate)
|
||||
throws InterruptedException, TimeoutException {
|
||||
|
||||
|
||||
if (closed) {
|
||||
throw new AlreadyClosedException();
|
||||
}
|
||||
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
waitLatches.add(latch);
|
||||
|
||||
|
||||
|
||||
LiveNodesListener listener = (o, n) -> {
|
||||
boolean matches = predicate.matches(o, n);
|
||||
if (matches)
|
||||
latch.countDown();
|
||||
return matches;
|
||||
};
|
||||
|
||||
|
||||
registerLiveNodesListener(listener);
|
||||
|
||||
try {
|
||||
|
@ -1581,7 +1594,7 @@ public class ZkStateReader implements SolrCloseable {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Remove a watcher from a collection's watch list.
|
||||
*
|
||||
|
@ -1692,7 +1705,10 @@ public class ZkStateReader implements SolrCloseable {
|
|||
return null;
|
||||
v.stateWatchers.remove(watcher);
|
||||
if (v.canBeRemoved()) {
|
||||
watchedCollectionProps.remove(collection);
|
||||
// don't want this to happen in middle of other blocks that might add it back.
|
||||
synchronized (watchedCollectionProps) {
|
||||
watchedCollectionProps.remove(collection);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
return v;
|
||||
|
|
Loading…
Reference in New Issue