SOLR-11782: Refactor LatchWatcher.await to protect against spurious wakeup

This commit is contained in:
Tomas Fernandez Lobbe 2018-01-26 10:33:10 -08:00
parent 3856ae2d85
commit 56f3f6d948
3 changed files with 89 additions and 34 deletions

View File

@ -108,6 +108,8 @@ New Features
* SOLR-11722: New CREATEROUTEDALIAS SolrCloud command to create a "time routed alias" over a series of collections
partitioned by time. (Gus Heck, David Smiley)
* SOLR-11782: Refactor LatchWatcher.await to protect against spurious wakeup (Tomás Fernández Löbbe, David Smiley, Dawid Weiss)
Bug Fixes
----------------------

View File

@ -16,13 +16,16 @@
*/
package org.apache.solr.cloud;
import com.codahale.metrics.Timer;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import com.codahale.metrics.Timer;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.util.Pair;
@ -108,25 +111,24 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
/**
* Watcher that blocks until a WatchedEvent occurs for a znode.
*/
private static final class LatchWatcher implements Watcher {
static final class LatchWatcher implements Watcher {
private final Object lock;
private final Lock lock;
private final Condition eventReceived;
private WatchedEvent event;
private Event.EventType latchEventType;
LatchWatcher(Object lock) {
this(lock, null);
LatchWatcher() {
this(null);
}
LatchWatcher(Event.EventType eventType) {
this(new Object(), eventType);
}
LatchWatcher(Object lock, Event.EventType eventType) {
this.lock = lock;
this.lock = new ReentrantLock();
this.eventReceived = lock.newCondition();
this.latchEventType = eventType;
}
@Override
public void process(WatchedEvent event) {
// session events are not change events, and do not remove the watcher
@ -136,17 +138,29 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
// If latchEventType is not null, only fire if the type matches
LOG.debug("{} fired on path {} state {} latchEventType {}", event.getType(), event.getPath(), event.getState(), latchEventType);
if (latchEventType == null || event.getType() == latchEventType) {
synchronized (lock) {
lock.lock();
try {
this.event = event;
lock.notifyAll();
eventReceived.signalAll();
} finally {
lock.unlock();
}
}
}
public void await(long timeout) throws InterruptedException {
synchronized (lock) {
if (this.event != null) return;
lock.wait(timeout);
public void await(long timeoutMs) throws InterruptedException {
assert timeoutMs > 0;
long timeoutNanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
lock.lock();
try {
while (this.event == null) {
if (timeoutNanos <= 0) {
return;
}
timeoutNanos = eventReceived.awaitNanos(timeoutNanos);
}
} finally {
lock.unlock();
}
}
@ -187,17 +201,14 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
// otherwise we may miss the response.
String watchID = createResponseNode();
Object lock = new Object();
LatchWatcher watcher = new LatchWatcher(lock);
LatchWatcher watcher = new LatchWatcher();
Stat stat = zookeeper.exists(watchID, watcher, true);
// create the request node
createRequestNode(data, watchID);
synchronized (lock) {
if (stat != null && watcher.getWatchedEvent() == null) {
watcher.await(timeout);
}
if (stat != null) {
watcher.await(timeout);
}
byte[] bytes = zookeeper.getData(watchID, null, null, true);
// create the event before deleting the node, otherwise we can get the deleted

View File

@ -16,7 +16,13 @@
*/
package org.apache.solr.cloud;
import javax.xml.parsers.ParserConfigurationException;
import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@ -31,10 +37,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
@ -64,7 +69,10 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.WatcherEvent;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -75,11 +83,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Slow
public class OverseerTest extends SolrTestCaseJ4 {
@ -1475,5 +1478,44 @@ public class OverseerTest extends SolrTestCaseJ4 {
server.shutdown();
}
}
@Test
public void testLatchWatcher() throws InterruptedException {
OverseerTaskQueue.LatchWatcher latch1 = new OverseerTaskQueue.LatchWatcher();
long before = System.nanoTime();
latch1.await(100);
long after = System.nanoTime();
assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) > 50);
assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 500);// Mostly to make sure the millis->nanos->millis is not broken
latch1.process(new WatchedEvent(new WatcherEvent(1, 1, "/foo/bar")));
before = System.nanoTime();
latch1.await(10000);// Expecting no wait
after = System.nanoTime();
assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000);
final AtomicBoolean expectedEventProcessed = new AtomicBoolean(false);
final AtomicBoolean doneWaiting = new AtomicBoolean(false);
final OverseerTaskQueue.LatchWatcher latch2 = new OverseerTaskQueue.LatchWatcher(Event.EventType.NodeCreated);
Thread t = new Thread(()->{
//Process an event of a different type first, this shouldn't release the latch
latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeDeleted.getIntValue(), 1, "/foo/bar")));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertFalse("Latch shouldn't have been released", doneWaiting.get());
// Now process the correct type of event
expectedEventProcessed.set(true);
latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeCreated.getIntValue(), 1, "/foo/bar")));
});
t.start();
before = System.nanoTime();
latch2.await(10000); // It shouldn't wait this long, t should notify the lock
after = System.nanoTime();
doneWaiting.set(true);
assertTrue(expectedEventProcessed.get());
assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000);
}
}