SOLR-6924: conf node listening made more robust to take care of session expiry and reconnect

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1669195 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2015-03-25 18:19:44 +00:00
parent 3a8d0c2f38
commit a14e40d75d
1 changed files with 52 additions and 32 deletions

View File

@ -2297,8 +2297,7 @@ public final class ZkController {
log.info("watch zkdir " + zkDir);
if (!confDirectoryListeners.containsKey(zkDir)) {
confDirectoryListeners.put(zkDir, new HashSet<Runnable>());
setConfWatcher(zkDir, new WatcherImpl(zkDir));
setConfWatcher(zkDir, new WatcherImpl(zkDir), null);
}
@ -2312,46 +2311,66 @@ public final class ZkController {
@Override
public void process(WatchedEvent event) {
try {
synchronized (confDirectoryListeners) {
// if this is not among directories to be watched then don't set the watcher anymore
if( !confDirectoryListeners.containsKey(zkDir)) {
log.info("Watcher on {} is removed ", zkDir);
return;
}
Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
if (listeners != null && !listeners.isEmpty()) {
final Set<Runnable> listenersCopy = new HashSet<>(listeners);
new Thread() {
//run these in a separate thread because this can be long running
public void run() {
for (final Runnable listener : listenersCopy) {
try {
listener.run();
} catch (Exception e) {
log.warn("listener throws error", e);
}
}
}
}.start();
}
}
Stat stat = null;
try {
stat = zkClient.exists(zkDir, null, true);
} catch (KeeperException e) {
//ignore , it is not a big deal
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
boolean resetWatcher = false;
try {
resetWatcher = fireEventListeners(zkDir);
} finally {
if (Event.EventType.None.equals(event.getType())) {
log.info("A node got unwatched for {}", zkDir);
return;
} else {
setConfWatcher(zkDir,this);
if (resetWatcher) setConfWatcher(zkDir, this, stat);
else log.info("A node got unwatched for {}", zkDir);
}
}
}
}
private void setConfWatcher(String zkDir, Watcher watcher) {
}
private boolean fireEventListeners(String zkDir) {
synchronized (confDirectoryListeners) {
// if this is not among directories to be watched then don't set the watcher anymore
if (!confDirectoryListeners.containsKey(zkDir)) {
log.info("Watcher on {} is removed ", zkDir);
return false;
}
Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
if (listeners != null && !listeners.isEmpty()) {
final Set<Runnable> listenersCopy = new HashSet<>(listeners);
new Thread() {
//run these in a separate thread because this can be long running
public void run() {
for (final Runnable listener : listenersCopy) {
try {
listener.run();
} catch (Exception e) {
log.warn("listener throws error", e);
}
}
}
}.start();
}
}
return true;
}
private void setConfWatcher(String zkDir, Watcher watcher, Stat stat) {
try {
Stat newStat = zkClient.exists(zkDir, watcher, true);
if (stat != null && newStat.getVersion() > stat.getVersion()) {
//a race condition where a we missed an even fired
//so fire the event listeners
fireEventListeners(zkDir);
}
zkClient.exists(zkDir, watcher, true);
} catch (KeeperException e) {
log.error("failed to set watcher for conf dir {} ", zkDir);
@ -2368,6 +2387,7 @@ public final class ZkController {
synchronized (confDirectoryListeners){
for (String s : confDirectoryListeners.keySet()) {
watchZKConfDir(s);
fireEventListeners(s);
}
}
}