SOLR-6631: Part deux - refactor LatchChildWatcher to LatchWatcher that takes an optional EventType during construction to only release the latch when a specific event type is received.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1635573 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2014-10-30 18:19:46 +00:00
parent b57699ff06
commit 967172f78d
3 changed files with 181 additions and 25 deletions

View File

@ -278,6 +278,10 @@ Bug Fixes
* SOLR-6591: Overseer can use stale cluster state and lose updates for collections
with stateFormat > 1. (shalin)
* SOLR-6631: DistributedQueue spinning on calling zookeeper getChildren()
(Jessica Cheng Mallet, Mark Miller, Timothy Potter)
Optimizations
----------------------

View File

@ -43,8 +43,7 @@ import org.slf4j.LoggerFactory;
* A distributed queue from zk recipes.
*/
public class DistributedQueue {
private static final Logger LOG = LoggerFactory
.getLogger(DistributedQueue.class);
private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);
private static long DEFAULT_TIMEOUT = 5*60*1000;
@ -229,38 +228,50 @@ public class DistributedQueue {
time.stop();
}
}
private class LatchChildWatcher implements Watcher {
final Object lock;
private WatchedEvent event = null;
public LatchChildWatcher() {
this.lock = new Object();
/**
* Watcher that blocks until a WatchedEvent occurs for a znode.
*/
private final class LatchWatcher implements Watcher {
private final Object lock;
private WatchedEvent event;
private Event.EventType latchEventType;
LatchWatcher(Object lock) {
this(lock, null);
}
public LatchChildWatcher(Object lock) {
this.lock = lock;
LatchWatcher(Event.EventType eventType) {
this(new Object(), eventType);
}
LatchWatcher(Object lock, Event.EventType eventType) {
this.lock = lock;
this.latchEventType = eventType;
}
@Override
public void process(WatchedEvent event) {
LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
+ event.getState() + " type " + event.getType());
synchronized (lock) {
this.event = event;
lock.notifyAll();
Event.EventType eventType = event.getType();
// None events are ignored
// If latchEventType is not null, only fire if the type matches
if (eventType != Event.EventType.None && (latchEventType == null || eventType == latchEventType)) {
LOG.info("{} fired on path {} state {}", eventType, event.getPath(), event.getState());
synchronized (lock) {
this.event = event;
lock.notifyAll();
}
}
}
public void await(long timeout) throws InterruptedException {
synchronized (lock) {
if (this.event != null) return;
lock.wait(timeout);
}
}
public WatchedEvent getWatchedEvent() {
return event;
}
@ -268,13 +279,13 @@ public class DistributedQueue {
// we avoid creating *many* watches in some cases
// by saving the childrenWatcher and the children associated - see SOLR-6336
private LatchChildWatcher childrenWatcher;
private LatchWatcher childrenWatcher;
private TreeMap<Long,String> fetchedChildren;
private final Object childrenWatcherLock = new Object();
private Map<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
{
LatchChildWatcher watcher;
LatchWatcher watcher;
TreeMap<Long,String> children;
synchronized (childrenWatcherLock) {
watcher = childrenWatcher;
@ -282,7 +293,8 @@ public class DistributedQueue {
}
if (watcher == null || watcher.getWatchedEvent() != null) {
watcher = new LatchChildWatcher();
// this watcher is only interested in child change events
watcher = new LatchWatcher(Watcher.Event.EventType.NodeChildrenChanged);
while (true) {
try {
children = orderedChildren(watcher);
@ -384,8 +396,9 @@ public class DistributedQueue {
String watchID = createData(
dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
null, CreateMode.EPHEMERAL);
Object lock = new Object();
LatchChildWatcher watcher = new LatchChildWatcher(lock);
LatchWatcher watcher = new LatchWatcher(lock);
synchronized (lock) {
if (zookeeper.exists(watchID, watcher, true) != null) {
watcher.await(timeout);

View File

@ -0,0 +1,139 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
import java.io.File;
import java.nio.charset.Charset;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class DistributedQueueTest extends SolrTestCaseJ4 {
private static final Charset UTF8 = Charset.forName("UTF-8");
protected ZkTestServer zkServer;
protected SolrZkClient zkClient;
@Before
public void beforeClass() {
System.setProperty("solr.solrxml.location", "zookeeper");
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
setupZk();
}
@Test
public void testDistributedQueue() throws Exception {
String dqZNode = "/distqueue/test";
String testData = "hello world";
long timeoutMs = 500L;
DistributedQueue dq = new DistributedQueue(zkClient, setupDistributedQueueZNode(dqZNode));
// basic ops
assertTrue(dq.poll() == null);
byte[] data = testData.getBytes(UTF8);
dq.offer(data);
assertEquals(new String(dq.peek(),UTF8), testData);
assertEquals(new String(dq.take(),UTF8), testData);
assertTrue(dq.poll() == null);
QueueEvent qe = dq.offer(data, timeoutMs);
assertNotNull(qe);
assertEquals(new String(dq.remove(),UTF8), testData);
// should block until the background thread makes the offer
(new QueueChangerThread(dq, 1000)).start();
qe = dq.peek(true);
assertNotNull(qe);
dq.remove();
// timeout scenario ... background thread won't offer until long after the peek times out
QueueChangerThread qct = new QueueChangerThread(dq, 1000);
qct.start();
qe = dq.peek(500);
assertTrue(qe == null);
try {
qct.interrupt();
} catch (Exception exc) {}
}
private class QueueChangerThread extends Thread {
DistributedQueue dq;
long waitBeforeOfferMs;
QueueChangerThread(DistributedQueue dq, long waitBeforeOfferMs) {
this.dq = dq;
this.waitBeforeOfferMs = waitBeforeOfferMs;
}
public void run() {
try {
Thread.sleep(waitBeforeOfferMs);
dq.offer(getName().getBytes(UTF8));
} catch (InterruptedException ie) {
// do nothing
} catch (Exception exc) {
throw new RuntimeException(exc);
}
}
}
protected String setupDistributedQueueZNode(String znodePath) throws Exception {
if (!zkClient.exists("/", true))
zkClient.makePath("/", false, true);
if (zkClient.exists(znodePath, true))
zkClient.clean(znodePath);
zkClient.makePath(znodePath, false, true);
return znodePath;
}
@Override
@After
public void tearDown() throws Exception {
try {
super.tearDown();
} catch (Exception exc) {}
closeZk();
}
protected void setupZk() throws Exception {
System.setProperty("zkClientTimeout", "8000");
zkServer = new ZkTestServer(createTempDir("zkData").toFile().getAbsolutePath());
zkServer.run();
System.setProperty("zkHost", zkServer.getZkAddress());
zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT);
assertTrue(zkClient.isConnected());
}
protected void closeZk() throws Exception {
if (zkClient != null)
zkClient.close();
zkServer.shutdown();
}
}