another thread safety fix

This commit is contained in:
fjy 2013-07-08 10:59:07 -07:00
parent 454f23d7c0
commit adda1488dc
1 changed files with 22 additions and 18 deletions

View File

@ -64,6 +64,8 @@ public class Announcer
private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap();
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap();
private final Object lock = new Object();
private boolean started = false;
public Announcer(
@ -249,28 +251,30 @@ public class Announcer
public void update(final String path, final byte[] bytes)
{
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
synchronized (lock) {
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath();
final String nodePath = pathAndNode.getNode();
final String parentPath = pathAndNode.getPath();
final String nodePath = pathAndNode.getNode();
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null || subPaths.get(nodePath) == null) {
announce(path, bytes);
return;
}
try {
byte[] oldBytes = subPaths.get(nodePath);
if (!Arrays.equals(oldBytes, bytes)) {
subPaths.put(nodePath, bytes);
updateAnnouncement(path, bytes);
if (subPaths == null || subPaths.get(nodePath) == null) {
announce(path, bytes);
return;
}
try {
byte[] oldBytes = subPaths.get(nodePath);
if (!Arrays.equals(oldBytes, bytes)) {
subPaths.put(nodePath, bytes);
updateAnnouncement(path, bytes);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}