SOLR-6336: Cache children as well so that they can be returned when the watcher is reused.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1616771 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-08-08 14:19:07 +00:00
parent db2e984a7b
commit 7959276928
1 changed files with 31 additions and 28 deletions

View File

@ -34,7 +34,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
@ -93,14 +95,8 @@ public class DistributedQueue {
private TreeMap<Long,String> orderedChildren(Watcher watcher) private TreeMap<Long,String> orderedChildren(Watcher watcher)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren = new TreeMap<>(); TreeMap<Long,String> orderedChildren = new TreeMap<>();
List<String> childNames = null; List<String> childNames = zookeeper.getChildren(dir, watcher, true);
try {
childNames = zookeeper.getChildren(dir, watcher, true);
} catch (KeeperException.NoNodeException e) {
throw e;
}
for (String childName : childNames) { for (String childName : childNames) {
try { try {
// Check format // Check format
@ -127,13 +123,7 @@ public class DistributedQueue {
public boolean containsTaskWithRequestId(String requestId) public boolean containsTaskWithRequestId(String requestId)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
List<String> childNames = null; List<String> childNames = zookeeper.getChildren(dir, null, true);
try {
childNames = zookeeper.getChildren(dir, null, true);
} catch (KeeperException.NoNodeException e) {
throw e;
}
for (String childName : childNames) { for (String childName : childNames) {
if (childName != null) { if (childName != null) {
try { try {
@ -249,11 +239,13 @@ public class DistributedQueue {
private class LatchChildWatcher implements Watcher { private class LatchChildWatcher implements Watcher {
Object lock = new Object(); final Object lock;
private WatchedEvent event = null; private WatchedEvent event = null;
public LatchChildWatcher() {} public LatchChildWatcher() {
this.lock = new Object();
}
public LatchChildWatcher(Object lock) { public LatchChildWatcher(Object lock) {
this.lock = lock; this.lock = lock;
} }
@ -281,12 +273,20 @@ public class DistributedQueue {
} }
// we avoid creating *many* watches in some cases // we avoid creating *many* watches in some cases
// by saving the childrenWatcher - see SOLR-6336 // by saving the childrenWatcher and the children associated - see SOLR-6336
private volatile LatchChildWatcher childrenWatcher; private LatchChildWatcher childrenWatcher;
private TreeMap<Long, String> getChildren(long wait) throws InterruptedException, KeeperException private TreeMap<Long,String> fetchedChildren;
private final Object childrenWatcherLock = new Object();
private Map<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
{ {
LatchChildWatcher watcher = childrenWatcher; LatchChildWatcher watcher;
TreeMap<Long,String> children = new TreeMap<> (); TreeMap<Long,String> children;
synchronized (childrenWatcherLock) {
watcher = childrenWatcher;
children = fetchedChildren;
}
if (watcher == null || watcher.getWatchedEvent() != null) { if (watcher == null || watcher.getWatchedEvent() != null) {
watcher = new LatchChildWatcher(); watcher = new LatchChildWatcher();
while (true) { while (true) {
@ -298,7 +298,10 @@ public class DistributedQueue {
// go back to the loop and try again // go back to the loop and try again
} }
} }
childrenWatcher = watcher; synchronized (childrenWatcherLock) {
childrenWatcher = watcher;
fetchedChildren = children;
}
} }
while (true) { while (true) {
@ -309,7 +312,7 @@ public class DistributedQueue {
} }
if (wait != Long.MAX_VALUE) break; if (wait != Long.MAX_VALUE) break;
} }
return children; return Collections.unmodifiableMap(children);
} }
/** /**
@ -321,7 +324,7 @@ public class DistributedQueue {
// Same as for element. Should refactor this. // Same as for element. Should refactor this.
TimerContext timer = stats.time(dir + "_take"); TimerContext timer = stats.time(dir + "_take");
try { try {
TreeMap<Long, String> orderedChildren = getChildren(Long.MAX_VALUE); Map<Long, String> orderedChildren = getChildren(Long.MAX_VALUE);
for (String headNode : orderedChildren.values()) { for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode; String path = dir + "/" + headNode;
try { try {
@ -429,7 +432,7 @@ public class DistributedQueue {
else time = stats.time(dir + "_peekTopN_wait" + wait); else time = stats.time(dir + "_peekTopN_wait" + wait);
try { try {
TreeMap<Long, String> orderedChildren = getChildren(wait); Map<Long, String> orderedChildren = getChildren(wait);
for (String headNode : orderedChildren.values()) { for (String headNode : orderedChildren.values()) {
if (headNode != null && topN.size() < n) { if (headNode != null && topN.size() < n) {
try { try {
@ -576,7 +579,7 @@ public class DistributedQueue {
return element(); return element();
} }
TreeMap<Long, String> orderedChildren = getChildren(wait); Map<Long, String> orderedChildren = getChildren(wait);
for (String headNode : orderedChildren.values()) { for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode; String path = dir + "/" + headNode;
try { try {