From 56f3f6d9484dd353ac50d47717c872ca9dac16ea Mon Sep 17 00:00:00 2001 From: Tomas Fernandez Lobbe Date: Fri, 26 Jan 2018 10:33:10 -0800 Subject: [PATCH] SOLR-11782: Refactor LatchWatcher.await to protect against spurious wakeup --- solr/CHANGES.txt | 2 + .../apache/solr/cloud/OverseerTaskQueue.java | 61 +++++++++++-------- .../org/apache/solr/cloud/OverseerTest.java | 60 +++++++++++++++--- 3 files changed, 89 insertions(+), 34 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index a1725e7f74a..f6223745a9e 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -108,6 +108,8 @@ New Features * SOLR-11722: New CREATEROUTEDALIAS SolrCloud command to create a "time routed alias" over a series of collections partitioned by time. (Gus Heck, David Smiley) +* SOLR-11782: Refactor LatchWatcher.await to protect against spurious wakeup (Tomás Fernández Löbbe, David Smiley, Dawid Weiss) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java index 4a6ea8a5733..368736960c1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java @@ -16,13 +16,16 @@ */ package org.apache.solr.cloud; +import com.codahale.metrics.Timer; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; - -import com.codahale.metrics.Timer; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.util.Pair; @@ -108,25 +111,24 @@ public class OverseerTaskQueue extends ZkDistributedQueue { /** * Watcher that blocks until a WatchedEvent occurs for a znode. */ - private static final class LatchWatcher implements Watcher { + static final class LatchWatcher implements Watcher { - private final Object lock; + private final Lock lock; + private final Condition eventReceived; private WatchedEvent event; private Event.EventType latchEventType; - - LatchWatcher(Object lock) { - this(lock, null); + + LatchWatcher() { + this(null); } - + LatchWatcher(Event.EventType eventType) { - this(new Object(), eventType); - } - - LatchWatcher(Object lock, Event.EventType eventType) { - this.lock = lock; + this.lock = new ReentrantLock(); + this.eventReceived = lock.newCondition(); this.latchEventType = eventType; } + @Override public void process(WatchedEvent event) { // session events are not change events, and do not remove the watcher @@ -136,17 +138,29 @@ public class OverseerTaskQueue extends ZkDistributedQueue { // If latchEventType is not null, only fire if the type matches LOG.debug("{} fired on path {} state {} latchEventType {}", event.getType(), event.getPath(), event.getState(), latchEventType); if (latchEventType == null || event.getType() == latchEventType) { - synchronized (lock) { + lock.lock(); + try { this.event = event; - lock.notifyAll(); + eventReceived.signalAll(); + } finally { + lock.unlock(); } } } - public void await(long timeout) throws InterruptedException { - synchronized (lock) { - if (this.event != null) return; - lock.wait(timeout); + public void await(long timeoutMs) throws InterruptedException { + assert timeoutMs > 0; + long timeoutNanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs); + lock.lock(); + try { + while (this.event == null) { + if (timeoutNanos <= 0) { + return; + } + timeoutNanos = eventReceived.awaitNanos(timeoutNanos); + } + } finally { + lock.unlock(); } } @@ -187,17 +201,14 @@ public class OverseerTaskQueue extends ZkDistributedQueue { // otherwise we may miss the response. String watchID = createResponseNode(); - Object lock = new Object(); - LatchWatcher watcher = new LatchWatcher(lock); + LatchWatcher watcher = new LatchWatcher(); Stat stat = zookeeper.exists(watchID, watcher, true); // create the request node createRequestNode(data, watchID); - synchronized (lock) { - if (stat != null && watcher.getWatchedEvent() == null) { - watcher.await(timeout); - } + if (stat != null) { + watcher.await(timeout); } byte[] bytes = zookeeper.getData(watchID, null, null, true); // create the event before deleting the node, otherwise we can get the deleted diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index 7259d389a2a..3b46922b896 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -16,7 +16,13 @@ */ package org.apache.solr.cloud; -import javax.xml.parsers.ParserConfigurationException; +import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; import java.io.File; import java.io.IOException; import java.lang.invoke.MethodHandles; @@ -31,10 +37,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - -import com.codahale.metrics.Snapshot; -import com.codahale.metrics.Timer; +import javax.xml.parsers.ParserConfigurationException; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.cloud.DistributedQueue; @@ -64,7 +69,10 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.proto.WatcherEvent; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -75,11 +83,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; -import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - @Slow public class OverseerTest extends SolrTestCaseJ4 { @@ -1475,5 +1478,44 @@ public class OverseerTest extends SolrTestCaseJ4 { server.shutdown(); } } + + @Test + public void testLatchWatcher() throws InterruptedException { + OverseerTaskQueue.LatchWatcher latch1 = new OverseerTaskQueue.LatchWatcher(); + long before = System.nanoTime(); + latch1.await(100); + long after = System.nanoTime(); + assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) > 50); + assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 500);// Mostly to make sure the millis->nanos->millis is not broken + latch1.process(new WatchedEvent(new WatcherEvent(1, 1, "/foo/bar"))); + before = System.nanoTime(); + latch1.await(10000);// Expecting no wait + after = System.nanoTime(); + assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000); + + final AtomicBoolean expectedEventProcessed = new AtomicBoolean(false); + final AtomicBoolean doneWaiting = new AtomicBoolean(false); + final OverseerTaskQueue.LatchWatcher latch2 = new OverseerTaskQueue.LatchWatcher(Event.EventType.NodeCreated); + Thread t = new Thread(()->{ + //Process an event of a different type first, this shouldn't release the latch + latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeDeleted.getIntValue(), 1, "/foo/bar"))); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertFalse("Latch shouldn't have been released", doneWaiting.get()); + // Now process the correct type of event + expectedEventProcessed.set(true); + latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeCreated.getIntValue(), 1, "/foo/bar"))); + }); + t.start(); + before = System.nanoTime(); + latch2.await(10000); // It shouldn't wait this long, t should notify the lock + after = System.nanoTime(); + doneWaiting.set(true); + assertTrue(expectedEventProcessed.get()); + assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000); + } }