SOLR-10420: fix watcher leak in DistributedQueue

This commit is contained in:
Scott Blum 2017-04-17 18:27:12 -04:00 committed by Scott Blum
parent 963c226d26
commit 43c2b2320d
2 changed files with 84 additions and 22 deletions

View File

@ -86,10 +86,9 @@ public class DistributedQueue {
*/ */
private final Condition changed = updateLock.newCondition(); private final Condition changed = updateLock.newCondition();
/** private boolean isDirty = true;
* If non-null, the last watcher to listen for child changes. If null, the in-memory contents are dirty.
*/ private int watcherCount = 0;
private ChildWatcher lastWatcher = null;
public DistributedQueue(SolrZkClient zookeeper, String dir) { public DistributedQueue(SolrZkClient zookeeper, String dir) {
this(zookeeper, dir, new Overseer.Stats()); this(zookeeper, dir, new Overseer.Stats());
@ -238,10 +237,10 @@ public class DistributedQueue {
try { try {
while (true) { while (true) {
try { try {
// We don't need to explicitly set isDirty here; if there is a watcher, it will // Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
// see the update and set the bit itself; if there is no watcher we can defer // This will get set again when the watcher actually fires, but that's ok.
// the update anyway.
zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true); zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
isDirty = true;
return; return;
} catch (KeeperException.NoNodeException e) { } catch (KeeperException.NoNodeException e) {
try { try {
@ -269,15 +268,25 @@ public class DistributedQueue {
private String firstChild(boolean remove) throws KeeperException, InterruptedException { private String firstChild(boolean remove) throws KeeperException, InterruptedException {
updateLock.lockInterruptibly(); updateLock.lockInterruptibly();
try { try {
// If we're not in a dirty state, and we have in-memory children, return from in-memory. if (!isDirty) {
if (lastWatcher != null && !knownChildren.isEmpty()) { // If we're not in a dirty state...
if (!knownChildren.isEmpty()) {
// and we have in-memory children, return from in-memory.
return remove ? knownChildren.pollFirst() : knownChildren.first(); return remove ? knownChildren.pollFirst() : knownChildren.first();
} else {
// otherwise there's nothing to return
return null;
}
} }
// Try to fetch an updated list of children from ZK. // Dirty, try to fetch an updated list of children from ZK.
ChildWatcher newWatcher = new ChildWatcher(); // Only set a new watcher if there isn't already a watcher.
ChildWatcher newWatcher = (watcherCount == 0) ? new ChildWatcher() : null;
knownChildren = fetchZkChildren(newWatcher); knownChildren = fetchZkChildren(newWatcher);
lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully if (newWatcher != null) {
watcherCount++; // watcher was successfully set
}
isDirty = false;
if (knownChildren.isEmpty()) { if (knownChildren.isEmpty()) {
return null; return null;
} }
@ -422,16 +431,25 @@ public class DistributedQueue {
} }
} }
@VisibleForTesting boolean hasWatcher() throws InterruptedException { @VisibleForTesting int watcherCount() throws InterruptedException {
updateLock.lockInterruptibly(); updateLock.lockInterruptibly();
try { try {
return lastWatcher != null; return watcherCount;
} finally { } finally {
updateLock.unlock(); updateLock.unlock();
} }
} }
private class ChildWatcher implements Watcher { @VisibleForTesting boolean isDirty() throws InterruptedException {
updateLock.lockInterruptibly();
try {
return isDirty;
} finally {
updateLock.unlock();
}
}
@VisibleForTesting class ChildWatcher implements Watcher {
@Override @Override
public void process(WatchedEvent event) { public void process(WatchedEvent event) {
@ -441,10 +459,8 @@ public class DistributedQueue {
} }
updateLock.lock(); updateLock.lock();
try { try {
// this watcher is automatically cleared when fired isDirty = true;
if (lastWatcher == this) { watcherCount--;
lastWatcher = null;
}
// optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
changed.signalAll(); changed.signalAll();
} finally { } finally {

View File

@ -113,13 +113,15 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
// After draining the queue, a watcher should be set. // After draining the queue, a watcher should be set.
assertNull(dq.peek(100)); assertNull(dq.peek(100));
assertTrue(dq.hasWatcher()); assertFalse(dq.isDirty());
assertEquals(1, dq.watcherCount());
forceSessionExpire(); forceSessionExpire();
// Session expiry should have fired the watcher. // Session expiry should have fired the watcher.
Thread.sleep(100); Thread.sleep(100);
assertFalse(dq.hasWatcher()); assertTrue(dq.isDirty());
assertEquals(0, dq.watcherCount());
// Rerun the earlier test make sure updates are still seen, post reconnection. // Rerun the earlier test make sure updates are still seen, post reconnection.
future = executor.submit(() -> new String(dq.peek(true), UTF8)); future = executor.submit(() -> new String(dq.peek(true), UTF8));
@ -137,6 +139,50 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
assertNull(dq.poll()); assertNull(dq.poll());
} }
@Test
public void testLeakChildWatcher() throws Exception {
String dqZNode = "/distqueue/test";
DistributedQueue dq = makeDistributedQueue(dqZNode);
assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
assertNull(dq.peek());
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
assertNull(dq.peek(10));
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
dq.offer("hello world".getBytes(UTF8));
assertNotNull(dq.peek()); // synchronously available
// dirty and watcher state indeterminate here, race with watcher
Thread.sleep(100); // watcher should have fired now
assertNotNull(dq.peek());
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
assertFalse(dq.peekElements(1, 1, s -> true).isEmpty());
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
}
@Test
public void testLocallyOffer() throws Exception {
String dqZNode = "/distqueue/test";
DistributedQueue dq = makeDistributedQueue(dqZNode);
dq.peekElements(1, 1, s -> true);
for (int i = 0; i < 100; i++) {
byte[] data = String.valueOf(i).getBytes(UTF8);
dq.offer(data);
assertNotNull(dq.peek());
dq.poll();
dq.peekElements(1, 1, s -> true);
}
}
@Test @Test
public void testPeekElements() throws Exception { public void testPeekElements() throws Exception {
String dqZNode = "/distqueue/test"; String dqZNode = "/distqueue/test";