SOLR-8454: Improve logging by ZkStateReader and clean up dead code

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1721393 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shai Erera 2015-12-22 13:28:27 +00:00
parent 3064db4d73
commit 8a9a8e2363
1 changed files with 97 additions and 160 deletions

View File

@ -31,7 +31,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.solr.common.Callable;
@ -56,7 +55,7 @@ import static java.util.Collections.unmodifiableSet;
import static org.apache.solr.common.util.Utils.fromJSON;
public class ZkStateReader implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String BASE_URL_PROP = "base_url";
public static final String NODE_NAME_PROP = "node_name";
@ -110,7 +109,7 @@ public class ZkStateReader implements Closeable {
public static final String ELECTION_NODE = "election";
/** Collections we actively care about, and will try to keep watch on. */
private final Set<String> interestingCollections = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final Set<String> interestingCollections = Collections.newSetFromMap(new ConcurrentHashMap<>());
/** Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json. */
private Map<String, ClusterState.CollectionRef> legacyCollectionStates = emptyMap();
@ -118,10 +117,10 @@ public class ZkStateReader implements Closeable {
private int legacyClusterStateVersion = 0;
/** Collections with format2 state.json, "interesting" and actively watched. */
private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<>();
/** Collections with format2 state.json, not "interesting" and not actively watched. */
private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<String, LazyCollectionRef>();
private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<>();
private volatile Set<String> liveNodes = emptySet();
@ -146,9 +145,7 @@ public class ZkStateReader implements Closeable {
String configName = null;
String path = COLLECTIONS_ZKNODE + "/" + collection;
if (log.isInfoEnabled()) {
log.info("Load collection config from:" + path);
}
LOG.info("Load collection config from: [{}]", path);
try {
byte[] data = zkClient.getData(path, null, null, true);
@ -160,12 +157,10 @@ public class ZkStateReader implements Closeable {
if (configName != null) {
if (!zkClient.exists(CONFIGS_ZKNODE + "/" + configName, true)) {
log.error("Specified config does not exist in ZooKeeper:" + configName);
throw new ZooKeeperException(ErrorCode.SERVER_ERROR,
"Specified config does not exist in ZooKeeper:" + configName);
} else if (log.isInfoEnabled()) {
log.info("path={} {}={} specified config exists in ZooKeeper",
new Object[] {path, CONFIGNAME_PROP, configName});
LOG.error("Specified config does not exist in ZooKeeper: [{}]", configName);
throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "Specified config does not exist in ZooKeeper: " + configName);
} else {
LOG.info("path=[{}] [{}]=[{}] specified config exists in ZooKeeper", path, CONFIGNAME_PROP, configName);
}
} else {
throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path);
@ -183,16 +178,6 @@ public class ZkStateReader implements Closeable {
}
private static class ZKTF implements ThreadFactory {
private static ThreadGroup tg = new ThreadGroup("ZkStateReader");
@Override
public Thread newThread(Runnable r) {
Thread td = new Thread(tg, r);
td.setDaemon(true);
return td;
}
}
private final SolrZkClient zkClient;
private final boolean closeClient;
@ -222,15 +207,13 @@ public class ZkStateReader implements Closeable {
try {
ZkStateReader.this.createClusterStateWatchersAndUpdate();
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
LOG.error("A ZK error has occurred", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
LOG.error("Interrupted", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "Interrupted", e);
}
}
});
@ -280,7 +263,7 @@ public class ZkStateReader implements Closeable {
DocCollection collection = clusterState.getCollectionOrNull(coll);
if (collection == null) return null;
if (collection.getZNodeVersion() < version) {
log.debug("server older than client {}<{}", collection.getZNodeVersion(), version);
LOG.debug("Server older than client {}<{}", collection.getZNodeVersion(), version);
DocCollection nu = getCollectionLive(this, coll);
if (nu == null) return -1 ;
if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
@ -293,7 +276,7 @@ public class ZkStateReader implements Closeable {
return null;
}
log.debug("wrong version from client {}!={} ", version, collection.getZNodeVersion());
LOG.debug("Wrong version from client [{}]!=[{}]", version, collection.getZNodeVersion());
return collection.getZNodeVersion();
}
@ -302,7 +285,7 @@ public class ZkStateReader implements Closeable {
InterruptedException {
// We need to fetch the current cluster state and the set of live nodes
log.info("Updating cluster state from ZooKeeper... ");
LOG.info("Updating cluster state from ZooKeeper... ");
// Sanity check ZK structure.
if (!zkClient.exists(CLUSTER_STATE, true)) {
@ -331,32 +314,23 @@ public class ZkStateReader implements Closeable {
}
try {
synchronized (ZkStateReader.this.getUpdateLock()) {
log.info("Updating aliases... ");
LOG.info("Updating aliases... ");
// remake watch
final Watcher thisWatch = this;
Stat stat = new Stat();
byte[] data = zkClient.getData(ALIASES, thisWatch, stat ,
true);
Aliases aliases = ClusterState.load(data);
ZkStateReader.this.aliases = aliases;
final Stat stat = new Stat();
final byte[] data = zkClient.getData(ALIASES, thisWatch, stat, true);
ZkStateReader.this.aliases = ClusterState.load(data);
}
} catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("", e);
throw new ZooKeeperException(
SolrException.ErrorCode.SERVER_ERROR, "", e);
LOG.error("A ZK error has occurred", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
return;
LOG.warn("Interrupted", e);
}
}
@ -365,7 +339,7 @@ public class ZkStateReader implements Closeable {
updateAliases();
if (securityNodeListener != null) {
addSecuritynodeWatcher(SOLR_SECURITY_CONF_PATH, new Callable<Pair<byte[], Stat>>() {
addSecuritynodeWatcher(new Callable<Pair<byte[], Stat>>() {
@Override
public void call(Pair<byte[], Stat> pair) {
ConfigData cd = new ConfigData();
@ -379,7 +353,7 @@ public class ZkStateReader implements Closeable {
}
}
private void addSecuritynodeWatcher(String path, final Callable<Pair<byte[], Stat>> callback)
private void addSecuritynodeWatcher(final Callable<Pair<byte[], Stat>> callback)
throws KeeperException, InterruptedException {
zkClient.exists(SOLR_SECURITY_CONF_PATH,
new Watcher() {
@ -393,34 +367,27 @@ public class ZkStateReader implements Closeable {
}
try {
synchronized (ZkStateReader.this.getUpdateLock()) {
log.info("Updating {} ... ", path);
LOG.info("Updating [{}] ... ", SOLR_SECURITY_CONF_PATH);
// remake watch
final Watcher thisWatch = this;
Stat stat = new Stat();
byte[] data = getZkClient().getData(path, thisWatch, stat, true);
final Stat stat = new Stat();
final byte[] data = getZkClient().getData(SOLR_SECURITY_CONF_PATH, thisWatch, stat, true);
try {
callback.call(new Pair<>(data, stat));
} catch (Exception e) {
if (e instanceof KeeperException) throw (KeeperException) e;
if (e instanceof InterruptedException) throw (InterruptedException) e;
log.error("Error running collections node listener", e);
LOG.error("Error running collections node listener", e);
}
}
} catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("", e);
throw new ZooKeeperException(
ErrorCode.SERVER_ERROR, "", e);
LOG.error("A ZK error has occurred", e);
throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
return;
LOG.warn("Interrupted", e);
}
}
@ -454,7 +421,7 @@ public class ZkStateReader implements Closeable {
}
this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);
log.debug("clusterStateSet: version {} legacy {} interesting {} watched {} lazy {} total {}",
LOG.debug("clusterStateSet: version [{}] legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]",
clusterState.getZkClusterStateVersion(),
legacyCollectionStates.keySet(),
interestingCollections,
@ -469,9 +436,9 @@ public class ZkStateReader implements Closeable {
private void refreshLegacyClusterState(Watcher watcher)
throws KeeperException, InterruptedException {
try {
Stat stat = new Stat();
byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
ClusterState loadedData = ClusterState.load(stat.getVersion(), data, emptySet(), CLUSTER_STATE);
final Stat stat = new Stat();
final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
final ClusterState loadedData = ClusterState.load(stat.getVersion(), data, emptySet(), CLUSTER_STATE);
synchronized (getUpdateLock()) {
this.legacyCollectionStates = loadedData.getCollectionStates();
this.legacyClusterStateVersion = stat.getVersion();
@ -512,7 +479,7 @@ public class ZkStateReader implements Closeable {
try {
children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
} catch (KeeperException.NoNodeException e) {
log.warn("Error fetching collection names");
LOG.warn("Error fetching collection names: [{}]", e.getMessage());
// fall through
}
if (children == null || children.isEmpty()) {
@ -569,7 +536,7 @@ public class ZkStateReader implements Closeable {
Set<String> newLiveNodes;
try {
List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
log.debug("Updating live nodes from ZooKeeper... ({})", nodeList.size());
LOG.debug("Updating live nodes from ZooKeeper... [{}]", nodeList.size());
newLiveNodes = new HashSet<>(nodeList);
} catch (KeeperException.NoNodeException e) {
newLiveNodes = emptySet();
@ -600,14 +567,12 @@ public class ZkStateReader implements Closeable {
}
}
public String getLeaderUrl(String collection, String shard, int timeout)
throws InterruptedException, KeeperException {
ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection,
shard, timeout));
public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException {
ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, shard, timeout));
return props.getCoreUrl();
}
public Replica getLeader(String collection, String shard) throws InterruptedException {
public Replica getLeader(String collection, String shard) {
if (clusterState != null) {
Replica replica = clusterState.getLeader(collection, shard);
if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) {
@ -714,21 +679,17 @@ public class ZkStateReader implements Closeable {
}
public void updateAliases() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData(ALIASES, null, null, true);
Aliases aliases = ClusterState.load(data);
ZkStateReader.this.aliases = aliases;
final byte[] data = zkClient.getData(ALIASES, null, null, true);
this.aliases = ClusterState.load(data);
}
public Map getClusterProps(){
Map result = null;
try {
if (getZkClient().exists(ZkStateReader.CLUSTER_PROPS, true)) {
result = (Map) Utils.fromJSON(getZkClient().getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)) ;
return (Map) Utils.fromJSON(getZkClient().getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)) ;
} else {
result= new LinkedHashMap();
return new LinkedHashMap();
}
return result;
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading cluster properties", e);
}
@ -749,7 +710,6 @@ public class ZkStateReader implements Closeable {
Stat s = new Stat();
try {
if (getZkClient().exists(CLUSTER_PROPS, true)) {
int v = 0;
Map properties = (Map) Utils.fromJSON(getZkClient().getData(CLUSTER_PROPS, null, s, true));
if (propertyValue == null) {
//Don't update ZK unless absolutely necessary.
@ -769,24 +729,18 @@ public class ZkStateReader implements Closeable {
properties.put(propertyName, propertyValue);
getZkClient().create(CLUSTER_PROPS, Utils.toJSON(properties), CreateMode.PERSISTENT, true);
}
} catch (KeeperException.BadVersionException bve) {
log.warn("Race condition while trying to set a new cluster prop on current version " + s.getVersion());
//race condition
continue;
} catch (KeeperException.NodeExistsException nee) {
log.warn("Race condition while trying to set a new cluster prop on current version " + s.getVersion());
} catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) {
LOG.warn("Race condition while trying to set a new cluster prop on current version [{}]", s.getVersion());
//race condition
continue;
} catch (Exception ex) {
log.error("Error updating path " + CLUSTER_PROPS, ex);
LOG.error("Error updating path [{}]", CLUSTER_PROPS, ex);
throw new SolrException(ErrorCode.SERVER_ERROR, "Error updating cluster property " + propertyName, ex);
}
break;
}
}
/**
* Returns the content of /security.json from ZooKeeper as a Map
* If the files doesn't exist, it returns null.
@ -799,8 +753,7 @@ public class ZkStateReader implements Closeable {
try {
Stat stat = new Stat();
if(getZkClient().exists(SOLR_SECURITY_CONF_PATH, true)) {
byte[] data = getZkClient()
.getData(ZkStateReader.SOLR_SECURITY_CONF_PATH, null, stat, true);
final byte[] data = getZkClient().getData(ZkStateReader.SOLR_SECURITY_CONF_PATH, null, stat, true);
return data != null && data.length > 0 ?
new ConfigData((Map<String, Object>) Utils.fromJSON(data), stat.getVersion()) :
null;
@ -810,6 +763,7 @@ public class ZkStateReader implements Closeable {
}
return null;
}
/**
* Returns the baseURL corresponding to a given node's nodeName --
* NOTE: does not (currently) imply that the nodeName (or resulting
@ -846,7 +800,7 @@ public class ZkStateReader implements Closeable {
public void process(WatchedEvent event) {
if (!interestingCollections.contains(coll)) {
// This collection is no longer interesting, stop watching.
log.info("Uninteresting collection {}", coll);
LOG.info("Uninteresting collection [{}]", coll);
return;
}
@ -856,9 +810,10 @@ public class ZkStateReader implements Closeable {
return;
}
log.info("A cluster state change: {} for collection {} has occurred - updating... (live nodes size: {})",
(event), coll, ZkStateReader.this.clusterState == null ? 0
: ZkStateReader.this.clusterState.getLiveNodes().size());
int liveNodesSize = ZkStateReader.this.clusterState == null ? 0
: ZkStateReader.this.clusterState.getLiveNodes().size();
LOG.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])",
event, coll, liveNodesSize);
refreshAndWatch(true);
synchronized (getUpdateLock()) {
@ -879,20 +834,16 @@ public class ZkStateReader implements Closeable {
updateWatchedCollection(coll, newState);
} catch (KeeperException.NoNodeException e) {
if (expectExists) {
log.warn("State node vanished for collection: " + coll, e);
LOG.warn("State node vanished for collection: [{}]", coll, e);
}
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("Unwatched collection: " + coll, e);
throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
LOG.error("Unwatched collection: [{}]", coll, e);
throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Unwatched collection :" + coll, e);
LOG.error("Unwatched collection: [{}]", coll, e);
}
}
}
@ -907,7 +858,8 @@ public class ZkStateReader implements Closeable {
if (EventType.None.equals(event.getType())) {
return;
}
log.info("A cluster state change: {}, has occurred - updating... (live nodes size: {})", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
int liveNodesSize = ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size();
LOG.info("A cluster state change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodesSize);
refreshAndWatch();
synchronized (getUpdateLock()) {
constructState();
@ -921,19 +873,15 @@ public class ZkStateReader implements Closeable {
} catch (KeeperException.NoNodeException e) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
LOG.error("A ZK error has occurred", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
LOG.warn("Interrupted", e);
}
}
}
@ -948,7 +896,7 @@ public class ZkStateReader implements Closeable {
if (EventType.None.equals(event.getType())) {
return;
}
log.info("A collections change: {}, has occurred - updating...", (event));
LOG.info("A collections change: [{}], has occurred - updating...", event);
refreshAndWatch();
synchronized (getUpdateLock()) {
constructState();
@ -959,19 +907,15 @@ public class ZkStateReader implements Closeable {
public void refreshAndWatch() {
try {
refreshCollectionList(this);
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
LOG.error("A ZK error has occurred", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
LOG.warn("Interrupted", e);
}
}
}
@ -986,41 +930,34 @@ public class ZkStateReader implements Closeable {
if (EventType.None.equals(event.getType())) {
return;
}
log.info("A live node change: {}, has occurred - updating... (live nodes size: {})", (event), liveNodes.size());
LOG.info("A live node change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodes.size());
refreshAndWatch();
}
public void refreshAndWatch() {
try {
refreshLiveNodes(this);
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
LOG.error("A ZK error has occurred", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
LOG.warn("Interrupted", e);
}
}
}
public static DocCollection getCollectionLive(ZkStateReader zkStateReader,
String coll) {
public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
try {
return zkStateReader.fetchCollectionState(coll, null);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Could not load collection from ZK:" + coll, e);
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK: " + coll, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.BAD_REQUEST,
"Could not load collection from ZK:" + coll, e);
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK: " + coll, e);
}
}
@ -1042,9 +979,9 @@ public class ZkStateReader implements Closeable {
return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
}
public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
public void addCollectionWatch(String coll) {
if (interestingCollections.add(coll)) {
log.info("addZkWatch {}", coll);
LOG.info("addZkWatch [{}]", coll);
new StateWatcher(coll).refreshAndWatch(false);
synchronized (getUpdateLock()) {
constructState();
@ -1054,7 +991,7 @@ public class ZkStateReader implements Closeable {
private void updateWatchedCollection(String coll, DocCollection newState) {
if (newState == null) {
log.info("Deleting data for {}", coll);
LOG.info("Deleting data for [{}]", coll);
watchedCollectionStates.remove(coll);
return;
}
@ -1067,7 +1004,7 @@ public class ZkStateReader implements Closeable {
DocCollection oldState = watchedCollectionStates.get(coll);
if (oldState == null) {
if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
log.info("Add data for {} ver {} ", coll, newState.getZNodeVersion());
LOG.info("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
break;
}
} else {
@ -1076,7 +1013,7 @@ public class ZkStateReader implements Closeable {
break;
}
if (watchedCollectionStates.replace(coll, oldState, newState)) {
log.info("Updating data for {} from {} to {} ", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
LOG.info("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
break;
}
}
@ -1085,13 +1022,13 @@ public class ZkStateReader implements Closeable {
// Resolve race with removeZKWatch.
if (!interestingCollections.contains(coll)) {
watchedCollectionStates.remove(coll);
log.info("Removing uninteresting collection {}", coll);
LOG.info("Removing uninteresting collection [{}]", coll);
}
}
/** This is not a public API. Only used by ZkController */
public void removeZKWatch(String coll) {
log.info("Removing watch for uninteresting collection {}", coll);
LOG.info("Removing watch for uninteresting collection [{}]", coll);
interestingCollections.remove(coll);
watchedCollectionStates.remove(coll);
lazyCollectionStates.put(coll, new LazyCollectionRef(coll));