SOLR-6336: DistributedQueue can easily create too many ZooKeeper Watches. (closes #80)

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1616654 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-08-08 01:16:36 +00:00
parent 611190f82f
commit c805a20bf5
2 changed files with 75 additions and 93 deletions

View File

@ -243,6 +243,9 @@ Bug Fixes
* SOLR-6163: Correctly decode special characters in managed stopwords and synonym endpoints.
(Vitaliy Zhovtyuk, Timo Schmidt via Timothy Potter)
* SOLR-6336: DistributedQueue can easily create too many ZooKeeper Watches.
(Ramkumar Aiyengar via Mark Miller)
Optimizations
---------------------

View File

@ -270,6 +270,7 @@ public class DistributedQueue {
public void await(long timeout) throws InterruptedException {
synchronized (lock) {
if (this.event != null) return;
lock.wait(timeout);
}
}
@ -278,41 +279,60 @@ public class DistributedQueue {
return event;
}
}
// we avoid creating *many* watches in some cases
// by saving the childrenWatcher - see SOLR-6336
private volatile LatchChildWatcher childrenWatcher;
private TreeMap<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
{
LatchChildWatcher watcher = childrenWatcher;
TreeMap<Long,String> children = new TreeMap<> ();
if (watcher == null || watcher.getWatchedEvent() != null) {
watcher = new LatchChildWatcher();
while (true) {
try {
children = orderedChildren(watcher);
break;
} catch (KeeperException.NoNodeException e) {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
// go back to the loop and try again
}
}
childrenWatcher = watcher;
}
while (true) {
if (!children.isEmpty()) break;
watcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
if (watcher.getWatchedEvent() != null) {
children = orderedChildren(null);
}
if (wait != Long.MAX_VALUE) break;
}
return children;
}
/**
* Removes the head of the queue and returns it, blocks until it succeeds.
*
* @return The former head of the queue
*/
public byte[] take() throws KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren;
// Same as for element. Should refactor this.
TimerContext timer = stats.time(dir + "_take");
try {
while (true) {
LatchChildWatcher childWatcher = new LatchChildWatcher();
TreeMap<Long, String> orderedChildren = getChildren(Long.MAX_VALUE);
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
orderedChildren = orderedChildren(childWatcher);
byte[] data = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return data;
} catch (KeeperException.NoNodeException e) {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
continue;
}
if (orderedChildren.size() == 0) {
childWatcher.await(DEFAULT_TIMEOUT);
continue;
}
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return data;
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
}
// Another client deleted the node first.
}
}
return null; // shouldn't really reach here..
} finally {
timer.stop();
}
@ -404,59 +424,36 @@ public class DistributedQueue {
ArrayList<QueueEvent> topN = new ArrayList<>();
LOG.debug("Peeking for top {} elements. ExcludeSet: " + excludeSet.toString());
boolean waitedEnough = false;
TimerContext time = null;
if (wait == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
else time = stats.time(dir + "_peekTopN_wait" + wait);
try {
TreeMap<Long, String> orderedChildren;
while (true) {
LatchChildWatcher childWatcher = new LatchChildWatcher();
try {
orderedChildren = orderedChildren(childWatcher);
} catch (KeeperException.NoNodeException e) {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
continue;
}
if (orderedChildren.size() == 0) {
if(waitedEnough) return null;
childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
waitedEnough = wait != Long.MAX_VALUE;
continue;
}
for (String headNode : orderedChildren.values()) {
if (headNode != null && topN.size() < n) {
try {
String id = dir + "/" + headNode;
if (excludeSet != null && excludeSet.contains(id)) continue;
QueueEvent queueEvent = new QueueEvent(id,
zookeeper.getData(dir + "/" + headNode, null, null, true), null);
topN.add(queueEvent);
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
} else {
if (topN.size() >= 1) {
printQueueEventsListElementIds(topN);
return topN;
}
TreeMap<Long, String> orderedChildren = getChildren(wait);
for (String headNode : orderedChildren.values()) {
if (headNode != null && topN.size() < n) {
try {
String id = dir + "/" + headNode;
if (excludeSet != null && excludeSet.contains(id)) continue;
QueueEvent queueEvent = new QueueEvent(id,
zookeeper.getData(dir + "/" + headNode, null, null, true), null);
topN.add(queueEvent);
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
} else {
if (topN.size() >= 1) {
printQueueEventsListElementIds(topN);
return topN;
}
}
if (topN.size() > 0 ) {
printQueueEventsListElementIds(topN);
return topN;
}
if (waitedEnough) {
LOG.debug("Waited enough, returning null after peekTopN");
return null;
}
childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
waitedEnough = wait != Long.MAX_VALUE;
}
if (topN.size() > 0 ) {
printQueueEventsListElementIds(topN);
return topN;
}
return null;
} finally {
time.stop();
}
@ -559,7 +556,7 @@ public class DistributedQueue {
public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
return peek(block ? Long.MAX_VALUE : 0);
}
/**
* Returns the data at the first element of the queue, or null if the queue is
* empty after wait ms.
@ -579,35 +576,17 @@ public class DistributedQueue {
return element();
}
TreeMap<Long, String> orderedChildren;
boolean waitedEnough = false;
while (true) {
LatchChildWatcher childWatcher = new LatchChildWatcher();
TreeMap<Long, String> orderedChildren = getChildren(wait);
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
orderedChildren = orderedChildren(childWatcher);
byte[] data = zookeeper.getData(path, null, null, true);
return new QueueEvent(path, data, null);
} catch (KeeperException.NoNodeException e) {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
continue;
}
if (waitedEnough) {
if (orderedChildren.isEmpty()) return null;
}
if (orderedChildren.size() == 0) {
childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
waitedEnough = wait != Long.MAX_VALUE;
continue;
}
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, null, null, true);
return new QueueEvent(path, data, childWatcher.getWatchedEvent());
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
}
// Another client deleted the node first.
}
}
return null;
} finally {
time.stop();
}