mirror of https://github.com/apache/lucene.git
SOLR-9181: Fix some races in ZkStateReader collection watch updates
This commit is contained in:
parent
2c96c91dd8
commit
cefab1dffc
|
@ -108,6 +108,9 @@ Bug Fixes
|
||||||
|
|
||||||
* SOLR-9246: If the JDBCStream sees an unknown column type it will now throw a detailed exception. (Dennis Gove)
|
* SOLR-9246: If the JDBCStream sees an unknown column type it will now throw a detailed exception. (Dennis Gove)
|
||||||
|
|
||||||
|
* SOLR-9181: Fix some races in CollectionStateWatcher API (Alan Woodward, Scott
|
||||||
|
Blum)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.solr.cloud.overseer;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.apache.solr.SolrTestCaseJ4;
|
import org.apache.solr.SolrTestCaseJ4;
|
||||||
|
@ -29,13 +30,14 @@ import org.apache.solr.cloud.ZkTestServer;
|
||||||
import org.apache.solr.common.cloud.ClusterState;
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.DocRouter;
|
import org.apache.solr.common.cloud.DocRouter;
|
||||||
import org.apache.solr.common.cloud.Slice;
|
|
||||||
import org.apache.solr.common.cloud.SolrZkClient;
|
import org.apache.solr.common.cloud.SolrZkClient;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
|
|
||||||
public class ZkStateReaderTest extends SolrTestCaseJ4 {
|
public class ZkStateReaderTest extends SolrTestCaseJ4 {
|
||||||
|
|
||||||
|
private static final long TIMEOUT = 30;
|
||||||
|
|
||||||
/** Uses explicit refresh to ensure latest changes are visible. */
|
/** Uses explicit refresh to ensure latest changes are visible. */
|
||||||
public void testStateFormatUpdateWithExplicitRefresh() throws Exception {
|
public void testStateFormatUpdateWithExplicitRefresh() throws Exception {
|
||||||
testStateFormatUpdate(true, true);
|
testStateFormatUpdate(true, true);
|
||||||
|
@ -84,7 +86,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
|
||||||
|
|
||||||
{
|
{
|
||||||
// create new collection with stateFormat = 1
|
// create new collection with stateFormat = 1
|
||||||
DocCollection stateV1 = new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE);
|
DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE);
|
||||||
ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1);
|
ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1);
|
||||||
writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
||||||
writer.writePendingUpdates();
|
writer.writePendingUpdates();
|
||||||
|
@ -97,12 +99,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
|
||||||
if (explicitRefresh) {
|
if (explicitRefresh) {
|
||||||
reader.forceUpdateCollection("c1");
|
reader.forceUpdateCollection("c1");
|
||||||
} else {
|
} else {
|
||||||
for (int i = 0; i < 1000; ++i) {
|
reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
|
||||||
if (reader.getClusterState().hasCollection("c1")) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Thread.sleep(50);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DocCollection collection = reader.getClusterState().getCollection("c1");
|
DocCollection collection = reader.getClusterState().getCollection("c1");
|
||||||
|
@ -112,7 +109,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
|
||||||
|
|
||||||
{
|
{
|
||||||
// Now update the collection to stateFormat = 2
|
// Now update the collection to stateFormat = 2
|
||||||
DocCollection stateV2 = new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
|
DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
|
||||||
ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2);
|
ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2);
|
||||||
writer.enqueueUpdate(reader.getClusterState(), c2, null);
|
writer.enqueueUpdate(reader.getClusterState(), c2, null);
|
||||||
writer.writePendingUpdates();
|
writer.writePendingUpdates();
|
||||||
|
@ -125,12 +122,8 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
|
||||||
if (explicitRefresh) {
|
if (explicitRefresh) {
|
||||||
reader.forceUpdateCollection("c1");
|
reader.forceUpdateCollection("c1");
|
||||||
} else {
|
} else {
|
||||||
for (int i = 0; i < 1000; ++i) {
|
reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS,
|
||||||
if (reader.getClusterState().getCollection("c1").getStateFormat() == 2) {
|
(n, c) -> c != null && c.getStateFormat() == 2);
|
||||||
break;
|
|
||||||
}
|
|
||||||
Thread.sleep(50);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DocCollection collection = reader.getClusterState().getCollection("c1");
|
DocCollection collection = reader.getClusterState().getCollection("c1");
|
||||||
|
@ -166,7 +159,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
|
||||||
|
|
||||||
// create new collection with stateFormat = 2
|
// create new collection with stateFormat = 2
|
||||||
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
||||||
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
|
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
|
||||||
writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
||||||
writer.writePendingUpdates();
|
writer.writePendingUpdates();
|
||||||
reader.forceUpdateCollection("c1");
|
reader.forceUpdateCollection("c1");
|
||||||
|
@ -216,7 +209,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
|
||||||
|
|
||||||
|
|
||||||
// create new collection with stateFormat = 2
|
// create new collection with stateFormat = 2
|
||||||
DocCollection state = new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
|
DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
|
||||||
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
|
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
|
||||||
writer.enqueueUpdate(reader.getClusterState(), wc, null);
|
writer.enqueueUpdate(reader.getClusterState(), wc, null);
|
||||||
writer.writePendingUpdates();
|
writer.writePendingUpdates();
|
||||||
|
@ -224,14 +217,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
|
||||||
assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
|
assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
|
||||||
|
|
||||||
//reader.forceUpdateCollection("c1");
|
//reader.forceUpdateCollection("c1");
|
||||||
|
reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
|
||||||
for (int i = 0; i < 100; ++i) {
|
|
||||||
Thread.sleep(50);
|
|
||||||
ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
|
|
||||||
if (ref != null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
|
ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
|
||||||
assertNotNull(ref);
|
assertNotNull(ref);
|
||||||
assertFalse(ref.isLazilyLoaded());
|
assertFalse(ref.isLazilyLoaded());
|
||||||
|
|
|
@ -197,7 +197,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "DocCollection("+name+")=" + JSONUtil.toJSON(this);
|
return "DocCollection("+name+"/" + znode + "/" + znodeVersion + ")=" + JSONUtil.toJSON(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -259,4 +259,26 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
|
||||||
}
|
}
|
||||||
return replicas;
|
return replicas;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the shardId of a core on a specific node
|
||||||
|
*/
|
||||||
|
public String getShardId(String nodeName, String coreName) {
|
||||||
|
for (Slice slice : this) {
|
||||||
|
for (Replica replica : slice) {
|
||||||
|
if (Objects.equals(replica.getNodeName(), nodeName) && Objects.equals(replica.getCoreName(), coreName))
|
||||||
|
return slice.getName();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object that) {
|
||||||
|
if (that instanceof DocCollection == false)
|
||||||
|
return false;
|
||||||
|
DocCollection other = (DocCollection) that;
|
||||||
|
return super.equals(that) && Objects.equals(this.znode, other.znode) && this.znodeVersion == other.znodeVersion;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.net.URLDecoder;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -265,16 +266,19 @@ public class ZkStateReader implements Closeable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// No need to set watchers because we should already have watchers registered for everything.
|
// No need to set watchers because we should already have watchers registered for everything.
|
||||||
|
refreshCollectionList(null);
|
||||||
|
refreshLiveNodes(null);
|
||||||
refreshLegacyClusterState(null);
|
refreshLegacyClusterState(null);
|
||||||
// Need a copy so we don't delete from what we're iterating over.
|
// Need a copy so we don't delete from what we're iterating over.
|
||||||
Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
|
Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
|
||||||
|
Map<String, DocCollection> updatedCollections = new HashMap<>();
|
||||||
for (String coll : safeCopy) {
|
for (String coll : safeCopy) {
|
||||||
DocCollection newState = fetchCollectionState(coll, null);
|
DocCollection newState = fetchCollectionState(coll, null);
|
||||||
updateWatchedCollection(coll, newState);
|
if (updateWatchedCollection(coll, newState)) {
|
||||||
|
updatedCollections.put(coll, newState);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
refreshCollectionList(null);
|
constructState(updatedCollections);
|
||||||
refreshLiveNodes(null);
|
|
||||||
constructState();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,12 +301,10 @@ public class ZkStateReader implements Closeable {
|
||||||
if (!legacyCollectionStates.containsKey(collection)) {
|
if (!legacyCollectionStates.containsKey(collection)) {
|
||||||
// No dice, see if a new collection just got created.
|
// No dice, see if a new collection just got created.
|
||||||
LazyCollectionRef tryLazyCollection = new LazyCollectionRef(collection);
|
LazyCollectionRef tryLazyCollection = new LazyCollectionRef(collection);
|
||||||
if (tryLazyCollection.get() == null) {
|
if (tryLazyCollection.get() != null) {
|
||||||
// No dice, just give up.
|
// What do you know, it exists!
|
||||||
return;
|
lazyCollectionStates.putIfAbsent(collection, tryLazyCollection);
|
||||||
}
|
}
|
||||||
// What do you know, it exists!
|
|
||||||
lazyCollectionStates.putIfAbsent(collection, tryLazyCollection);
|
|
||||||
}
|
}
|
||||||
} else if (ref.isLazilyLoaded()) {
|
} else if (ref.isLazilyLoaded()) {
|
||||||
if (ref.get() != null) {
|
if (ref.get() != null) {
|
||||||
|
@ -313,9 +315,10 @@ public class ZkStateReader implements Closeable {
|
||||||
} else if (watchedCollectionStates.containsKey(collection)) {
|
} else if (watchedCollectionStates.containsKey(collection)) {
|
||||||
// Exists as a watched collection, force a refresh.
|
// Exists as a watched collection, force a refresh.
|
||||||
DocCollection newState = fetchCollectionState(collection, null);
|
DocCollection newState = fetchCollectionState(collection, null);
|
||||||
updateWatchedCollection(collection, newState);
|
if (updateWatchedCollection(collection, newState)) {
|
||||||
|
constructState(Collections.singletonMap(collection, newState));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
constructState();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -337,7 +340,11 @@ public class ZkStateReader implements Closeable {
|
||||||
DocCollection nu = getCollectionLive(this, coll);
|
DocCollection nu = getCollectionLive(this, coll);
|
||||||
if (nu == null) return -1 ;
|
if (nu == null) return -1 ;
|
||||||
if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
|
if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
|
||||||
updateWatchedCollection(coll, nu);
|
if (updateWatchedCollection(coll, nu)) {
|
||||||
|
synchronized (getUpdateLock()) {
|
||||||
|
constructState(Collections.singletonMap(coll, nu));
|
||||||
|
}
|
||||||
|
}
|
||||||
collection = nu;
|
collection = nu;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -371,7 +378,7 @@ public class ZkStateReader implements Closeable {
|
||||||
refreshCollectionList(new CollectionsChildWatcher());
|
refreshCollectionList(new CollectionsChildWatcher());
|
||||||
|
|
||||||
synchronized (ZkStateReader.this.getUpdateLock()) {
|
synchronized (ZkStateReader.this.getUpdateLock()) {
|
||||||
constructState();
|
constructState(Collections.emptyMap());
|
||||||
|
|
||||||
zkClient.exists(ALIASES,
|
zkClient.exists(ALIASES,
|
||||||
new Watcher() {
|
new Watcher() {
|
||||||
|
@ -463,8 +470,14 @@ public class ZkStateReader implements Closeable {
|
||||||
/**
|
/**
|
||||||
* Construct the total state view from all sources.
|
* Construct the total state view from all sources.
|
||||||
* Must hold {@link #getUpdateLock()} before calling this.
|
* Must hold {@link #getUpdateLock()} before calling this.
|
||||||
|
*
|
||||||
|
* @param changedCollections collections that have changed since the last call,
|
||||||
|
* and that should fire notifications
|
||||||
*/
|
*/
|
||||||
private void constructState() {
|
private void constructState(Map<String, DocCollection> changedCollections) {
|
||||||
|
|
||||||
|
Set<String> liveNodes = this.liveNodes; // volatile read
|
||||||
|
|
||||||
// Legacy clusterstate is authoritative, for backwards compatibility.
|
// Legacy clusterstate is authoritative, for backwards compatibility.
|
||||||
// To move a collection's state to format2, first create the new state2 format node, then remove legacy entry.
|
// To move a collection's state to format2, first create the new state2 format node, then remove legacy entry.
|
||||||
Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>(legacyCollectionStates);
|
Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>(legacyCollectionStates);
|
||||||
|
@ -480,6 +493,7 @@ public class ZkStateReader implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);
|
this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);
|
||||||
|
|
||||||
LOG.debug("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]",
|
LOG.debug("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]",
|
||||||
legacyCollectionStates.keySet().size(),
|
legacyCollectionStates.keySet().size(),
|
||||||
collectionWatches.keySet().size(),
|
collectionWatches.keySet().size(),
|
||||||
|
@ -495,6 +509,10 @@ public class ZkStateReader implements Closeable {
|
||||||
lazyCollectionStates.keySet(),
|
lazyCollectionStates.keySet(),
|
||||||
clusterState.getCollectionStates());
|
clusterState.getCollectionStates());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (Map.Entry<String, DocCollection> entry : changedCollections.entrySet()) {
|
||||||
|
notifyStateWatchers(liveNodes, entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -510,30 +528,31 @@ public class ZkStateReader implements Closeable {
|
||||||
// Nothing to do, someone else updated same or newer.
|
// Nothing to do, someone else updated same or newer.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Set<String> liveNodes = this.liveNodes; // volatile read
|
Map<String, DocCollection> updatedCollections = new HashMap<>();
|
||||||
for (Map.Entry<String, CollectionWatch> watchEntry : this.collectionWatches.entrySet()) {
|
for (String coll : this.collectionWatches.keySet()) {
|
||||||
String coll = watchEntry.getKey();
|
|
||||||
CollectionWatch collWatch = watchEntry.getValue();
|
|
||||||
ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll);
|
ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll);
|
||||||
if (ref == null)
|
|
||||||
continue;
|
|
||||||
// legacy collections are always in-memory
|
// legacy collections are always in-memory
|
||||||
DocCollection oldState = ref.get();
|
DocCollection oldState = ref == null ? null : ref.get();
|
||||||
ClusterState.CollectionRef newRef = loadedData.getCollectionStates().get(coll);
|
ClusterState.CollectionRef newRef = loadedData.getCollectionStates().get(coll);
|
||||||
DocCollection newState = newRef == null ? null : newRef.get();
|
DocCollection newState = newRef == null ? null : newRef.get();
|
||||||
if (!collWatch.stateWatchers.isEmpty()
|
if (newState == null) {
|
||||||
&& !Objects.equals(oldState, newState)) {
|
// check that we haven't just migrated
|
||||||
notifyStateWatchers(liveNodes, coll, newState);
|
newState = watchedCollectionStates.get(coll);
|
||||||
|
}
|
||||||
|
if (!Objects.equals(oldState, newState)) {
|
||||||
|
updatedCollections.put(coll, newState);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.legacyCollectionStates = loadedData.getCollectionStates();
|
this.legacyCollectionStates = loadedData.getCollectionStates();
|
||||||
this.legacyClusterStateVersion = stat.getVersion();
|
this.legacyClusterStateVersion = stat.getVersion();
|
||||||
|
constructState(updatedCollections);
|
||||||
}
|
}
|
||||||
} catch (KeeperException.NoNodeException e) {
|
} catch (KeeperException.NoNodeException e) {
|
||||||
// Ignore missing legacy clusterstate.json.
|
// Ignore missing legacy clusterstate.json.
|
||||||
synchronized (getUpdateLock()) {
|
synchronized (getUpdateLock()) {
|
||||||
this.legacyCollectionStates = emptyMap();
|
this.legacyCollectionStates = emptyMap();
|
||||||
this.legacyClusterStateVersion = 0;
|
this.legacyClusterStateVersion = 0;
|
||||||
|
constructState(Collections.emptyMap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -555,7 +574,7 @@ public class ZkStateReader implements Closeable {
|
||||||
*
|
*
|
||||||
* A stateFormat=1 collection which is not interesting to us can also
|
* A stateFormat=1 collection which is not interesting to us can also
|
||||||
* be put into the {@link #lazyCollectionStates} map here. But that is okay
|
* be put into the {@link #lazyCollectionStates} map here. But that is okay
|
||||||
* because {@link #constructState()} will give priority to collections in the
|
* because {@link #constructState(Map)} will give priority to collections in the
|
||||||
* shared collection state over this map.
|
* shared collection state over this map.
|
||||||
* In fact this is a clever way to avoid doing a ZK exists check on
|
* In fact this is a clever way to avoid doing a ZK exists check on
|
||||||
* the /collections/collection_name/state.json znode
|
* the /collections/collection_name/state.json znode
|
||||||
|
@ -908,15 +927,11 @@ public class ZkStateReader implements Closeable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int liveNodesSize = ZkStateReader.this.clusterState == null ? 0
|
Set<String> liveNodes = ZkStateReader.this.liveNodes;
|
||||||
: ZkStateReader.this.clusterState.getLiveNodes().size();
|
|
||||||
LOG.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])",
|
LOG.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])",
|
||||||
event, coll, liveNodesSize);
|
event, coll, liveNodes.size());
|
||||||
|
|
||||||
refreshAndWatch();
|
refreshAndWatch();
|
||||||
synchronized (getUpdateLock()) {
|
|
||||||
constructState();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -929,6 +944,10 @@ public class ZkStateReader implements Closeable {
|
||||||
try {
|
try {
|
||||||
DocCollection newState = fetchCollectionState(coll, this);
|
DocCollection newState = fetchCollectionState(coll, this);
|
||||||
updateWatchedCollection(coll, newState);
|
updateWatchedCollection(coll, newState);
|
||||||
|
synchronized (getUpdateLock()) {
|
||||||
|
constructState(Collections.singletonMap(coll, newState));
|
||||||
|
}
|
||||||
|
|
||||||
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
|
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
|
||||||
LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
|
LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
|
@ -953,9 +972,6 @@ public class ZkStateReader implements Closeable {
|
||||||
int liveNodesSize = ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size();
|
int liveNodesSize = ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size();
|
||||||
LOG.info("A cluster state change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodesSize);
|
LOG.info("A cluster state change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodesSize);
|
||||||
refreshAndWatch();
|
refreshAndWatch();
|
||||||
synchronized (getUpdateLock()) {
|
|
||||||
constructState();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Must hold {@link #getUpdateLock()} before calling this method. */
|
/** Must hold {@link #getUpdateLock()} before calling this method. */
|
||||||
|
@ -990,7 +1006,7 @@ public class ZkStateReader implements Closeable {
|
||||||
LOG.info("A collections change: [{}], has occurred - updating...", event);
|
LOG.info("A collections change: [{}], has occurred - updating...", event);
|
||||||
refreshAndWatch();
|
refreshAndWatch();
|
||||||
synchronized (getUpdateLock()) {
|
synchronized (getUpdateLock()) {
|
||||||
constructState();
|
constructState(Collections.emptyMap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1109,9 +1125,6 @@ public class ZkStateReader implements Closeable {
|
||||||
});
|
});
|
||||||
if (reconstructState.get()) {
|
if (reconstructState.get()) {
|
||||||
new StateWatcher(collection).refreshAndWatch();
|
new StateWatcher(collection).refreshAndWatch();
|
||||||
synchronized (getUpdateLock()) {
|
|
||||||
constructState();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1142,7 +1155,7 @@ public class ZkStateReader implements Closeable {
|
||||||
});
|
});
|
||||||
if (reconstructState.get()) {
|
if (reconstructState.get()) {
|
||||||
synchronized (getUpdateLock()) {
|
synchronized (getUpdateLock()) {
|
||||||
constructState();
|
constructState(Collections.emptyMap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1160,17 +1173,14 @@ public class ZkStateReader implements Closeable {
|
||||||
v.stateWatchers.add(stateWatcher);
|
v.stateWatchers.add(stateWatcher);
|
||||||
return v;
|
return v;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (watchSet.get()) {
|
if (watchSet.get()) {
|
||||||
new StateWatcher(collection).refreshAndWatch();
|
new StateWatcher(collection).refreshAndWatch();
|
||||||
synchronized (getUpdateLock()) {
|
|
||||||
constructState();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
DocCollection state = clusterState.getCollectionOrNull(collection);
|
DocCollection state = clusterState.getCollectionOrNull(collection);
|
||||||
if (stateWatcher.onStateChanged(liveNodes, state) == true) {
|
if (stateWatcher.onStateChanged(liveNodes, state) == true) {
|
||||||
removeCollectionStateWatcher(collection, stateWatcher);
|
removeCollectionStateWatcher(collection, stateWatcher);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1240,17 +1250,15 @@ public class ZkStateReader implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns true if the state has changed
|
// returns true if the state has changed
|
||||||
private void updateWatchedCollection(String coll, DocCollection newState) {
|
private boolean updateWatchedCollection(String coll, DocCollection newState) {
|
||||||
|
|
||||||
Set<String> liveNodes = this.liveNodes; // volatile read
|
|
||||||
|
|
||||||
if (newState == null) {
|
if (newState == null) {
|
||||||
LOG.info("Deleting data for [{}]", coll);
|
LOG.info("Deleting data for [{}]", coll);
|
||||||
watchedCollectionStates.remove(coll);
|
watchedCollectionStates.remove(coll);
|
||||||
notifyStateWatchers(liveNodes, coll, null);
|
return true;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean updated = false;
|
||||||
// CAS update loop
|
// CAS update loop
|
||||||
while (true) {
|
while (true) {
|
||||||
if (!collectionWatches.containsKey(coll)) {
|
if (!collectionWatches.containsKey(coll)) {
|
||||||
|
@ -1260,19 +1268,19 @@ public class ZkStateReader implements Closeable {
|
||||||
if (oldState == null) {
|
if (oldState == null) {
|
||||||
if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
|
if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
|
||||||
LOG.info("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
|
LOG.info("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
|
||||||
notifyStateWatchers(liveNodes, coll, newState);
|
updated = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
|
if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
|
||||||
// no change to state, but we might have been triggered by the addition of a
|
// no change to state, but we might have been triggered by the addition of a
|
||||||
// state watcher, so run notifications
|
// state watcher, so run notifications
|
||||||
notifyStateWatchers(liveNodes, coll, newState);
|
updated = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (watchedCollectionStates.replace(coll, oldState, newState)) {
|
if (watchedCollectionStates.replace(coll, oldState, newState)) {
|
||||||
LOG.info("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
|
LOG.info("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
|
||||||
notifyStateWatchers(liveNodes, coll, newState);
|
updated = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1284,6 +1292,7 @@ public class ZkStateReader implements Closeable {
|
||||||
LOG.info("Removing uninteresting collection [{}]", coll);
|
LOG.info("Removing uninteresting collection [{}]", coll);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return updated;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class ConfigData {
|
public static class ConfigData {
|
||||||
|
|
Loading…
Reference in New Issue