mirror of
https://github.com/apache/lucene.git
synced 2025-02-09 11:35:14 +00:00
SOLR-10619: Optimize using cache for DistributedQueue in case of single-consumer
This commit is contained in:
parent
421611bac9
commit
a24fa8d7db
@ -258,6 +258,8 @@ Optimizations
|
|||||||
|
|
||||||
* SOLR-10524: Better ZkStateWriter batching (Cao Manh Dat, Noble Paul, shalin, Scott Blum)
|
* SOLR-10524: Better ZkStateWriter batching (Cao Manh Dat, Noble Paul, shalin, Scott Blum)
|
||||||
|
|
||||||
|
* SOLR-10619: Optimize using cache for DistributedQueue in case of single-consumer (Cao Manh Dat, Scott Blum)
|
||||||
|
|
||||||
Bug Fixes
|
Bug Fixes
|
||||||
----------------------
|
----------------------
|
||||||
* SOLR-10281: ADMIN_PATHS is duplicated in two places and inconsistent. This can cause automatic
|
* SOLR-10281: ADMIN_PATHS is duplicated in two places and inconsistent. This can cause automatic
|
||||||
|
@ -43,7 +43,9 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A distributed queue.
|
* A distributed queue. Optimized for single-consumer,
|
||||||
|
* multiple-producer: if there are multiple consumers on the same ZK queue,
|
||||||
|
* the results should be correct but inefficient
|
||||||
*/
|
*/
|
||||||
public class DistributedQueue {
|
public class DistributedQueue {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
@ -265,18 +267,16 @@ public class DistributedQueue {
|
|||||||
* The caller must double check that the actual node still exists, since the in-memory
|
* The caller must double check that the actual node still exists, since the in-memory
|
||||||
* list is inherently stale.
|
* list is inherently stale.
|
||||||
*/
|
*/
|
||||||
private String firstChild(boolean remove) throws KeeperException, InterruptedException {
|
private String firstChild(boolean remove, boolean refetchIfDirty) throws KeeperException, InterruptedException {
|
||||||
updateLock.lockInterruptibly();
|
updateLock.lockInterruptibly();
|
||||||
try {
|
try {
|
||||||
if (!isDirty) {
|
// We always return from cache first, the cache will be cleared if the node is not exist
|
||||||
// If we're not in a dirty state...
|
if (!knownChildren.isEmpty() && !(isDirty && refetchIfDirty)) {
|
||||||
if (!knownChildren.isEmpty()) {
|
return remove ? knownChildren.pollFirst() : knownChildren.first();
|
||||||
// and we have in-memory children, return from in-memory.
|
}
|
||||||
return remove ? knownChildren.pollFirst() : knownChildren.first();
|
|
||||||
} else {
|
if (!isDirty && knownChildren.isEmpty()) {
|
||||||
// otherwise there's nothing to return
|
return null;
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dirty, try to fetch an updated list of children from ZK.
|
// Dirty, try to fetch an updated list of children from ZK.
|
||||||
@ -332,9 +332,10 @@ public class DistributedQueue {
|
|||||||
Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
|
Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
|
||||||
List<String> foundChildren = new ArrayList<>();
|
List<String> foundChildren = new ArrayList<>();
|
||||||
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
|
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
|
||||||
|
boolean first = true;
|
||||||
while (true) {
|
while (true) {
|
||||||
// Trigger a fetch if needed.
|
// Trigger a refresh, but only force it if this is not the first iteration.
|
||||||
firstChild(false);
|
firstChild(false, !first);
|
||||||
|
|
||||||
updateLock.lockInterruptibly();
|
updateLock.lockInterruptibly();
|
||||||
try {
|
try {
|
||||||
@ -349,6 +350,13 @@ public class DistributedQueue {
|
|||||||
if (waitNanos <= 0) {
|
if (waitNanos <= 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If this is our first time through, force a refresh before waiting.
|
||||||
|
if (first) {
|
||||||
|
first = false;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
waitNanos = changed.awaitNanos(waitNanos);
|
waitNanos = changed.awaitNanos(waitNanos);
|
||||||
} finally {
|
} finally {
|
||||||
updateLock.unlock();
|
updateLock.unlock();
|
||||||
@ -390,7 +398,7 @@ public class DistributedQueue {
|
|||||||
*/
|
*/
|
||||||
private byte[] firstElement() throws KeeperException, InterruptedException {
|
private byte[] firstElement() throws KeeperException, InterruptedException {
|
||||||
while (true) {
|
while (true) {
|
||||||
String firstChild = firstChild(false);
|
String firstChild = firstChild(false, false);
|
||||||
if (firstChild == null) {
|
if (firstChild == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -400,7 +408,9 @@ public class DistributedQueue {
|
|||||||
// Another client deleted the node first, remove the in-memory and retry.
|
// Another client deleted the node first, remove the in-memory and retry.
|
||||||
updateLock.lockInterruptibly();
|
updateLock.lockInterruptibly();
|
||||||
try {
|
try {
|
||||||
knownChildren.remove(firstChild);
|
// Efficient only for single-consumer
|
||||||
|
knownChildren.clear();
|
||||||
|
isDirty = true;
|
||||||
} finally {
|
} finally {
|
||||||
updateLock.unlock();
|
updateLock.unlock();
|
||||||
}
|
}
|
||||||
@ -410,7 +420,7 @@ public class DistributedQueue {
|
|||||||
|
|
||||||
private byte[] removeFirst() throws KeeperException, InterruptedException {
|
private byte[] removeFirst() throws KeeperException, InterruptedException {
|
||||||
while (true) {
|
while (true) {
|
||||||
String firstChild = firstChild(true);
|
String firstChild = firstChild(true, false);
|
||||||
if (firstChild == null) {
|
if (firstChild == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -418,12 +428,15 @@ public class DistributedQueue {
|
|||||||
String path = dir + "/" + firstChild;
|
String path = dir + "/" + firstChild;
|
||||||
byte[] result = zookeeper.getData(path, null, null, true);
|
byte[] result = zookeeper.getData(path, null, null, true);
|
||||||
zookeeper.delete(path, -1, true);
|
zookeeper.delete(path, -1, true);
|
||||||
|
stats.setQueueLength(knownChildren.size());
|
||||||
return result;
|
return result;
|
||||||
} catch (KeeperException.NoNodeException e) {
|
} catch (KeeperException.NoNodeException e) {
|
||||||
// Another client deleted the node first, remove the in-memory and retry.
|
// Another client deleted the node first, remove the in-memory and retry.
|
||||||
updateLock.lockInterruptibly();
|
updateLock.lockInterruptibly();
|
||||||
try {
|
try {
|
||||||
knownChildren.remove(firstChild);
|
// Efficient only for single-consumer
|
||||||
|
knownChildren.clear();
|
||||||
|
isDirty = true;
|
||||||
} finally {
|
} finally {
|
||||||
updateLock.unlock();
|
updateLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -90,6 +90,35 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
|
|||||||
qct.join();
|
qct.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDistributedQueueCache() throws Exception {
|
||||||
|
String dqZNode = "/distqueue/test";
|
||||||
|
byte[] data = "hello world".getBytes(UTF8);
|
||||||
|
|
||||||
|
DistributedQueue consumer = makeDistributedQueue(dqZNode);
|
||||||
|
DistributedQueue producer = makeDistributedQueue(dqZNode);
|
||||||
|
DistributedQueue producer2 = makeDistributedQueue(dqZNode);
|
||||||
|
|
||||||
|
producer2.offer(data);
|
||||||
|
producer.offer(data);
|
||||||
|
producer.offer(data);
|
||||||
|
consumer.poll();
|
||||||
|
|
||||||
|
assertEquals(2, consumer.getStats().getQueueLength());
|
||||||
|
producer.offer(data);
|
||||||
|
producer2.offer(data);
|
||||||
|
consumer.poll();
|
||||||
|
// Wait for watcher being kicked off
|
||||||
|
while (!consumer.isDirty()) {
|
||||||
|
Thread.sleep(20);
|
||||||
|
}
|
||||||
|
// DQ still have elements in their queue, so we should not fetch elements path from Zk
|
||||||
|
assertEquals(1, consumer.getStats().getQueueLength());
|
||||||
|
consumer.poll();
|
||||||
|
consumer.peek();
|
||||||
|
assertEquals(2, consumer.getStats().getQueueLength());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDistributedQueueBlocking() throws Exception {
|
public void testDistributedQueueBlocking() throws Exception {
|
||||||
String dqZNode = "/distqueue/test";
|
String dqZNode = "/distqueue/test";
|
||||||
@ -161,6 +190,13 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
|
|||||||
// dirty and watcher state indeterminate here, race with watcher
|
// dirty and watcher state indeterminate here, race with watcher
|
||||||
Thread.sleep(100); // watcher should have fired now
|
Thread.sleep(100); // watcher should have fired now
|
||||||
assertNotNull(dq.peek());
|
assertNotNull(dq.peek());
|
||||||
|
// in case of race condition, childWatcher is kicked off after peek()
|
||||||
|
if (dq.watcherCount() == 0) {
|
||||||
|
assertTrue(dq.isDirty());
|
||||||
|
dq.poll();
|
||||||
|
dq.offer("hello world".getBytes(UTF8));
|
||||||
|
dq.peek();
|
||||||
|
}
|
||||||
assertEquals(1, dq.watcherCount());
|
assertEquals(1, dq.watcherCount());
|
||||||
assertFalse(dq.isDirty());
|
assertFalse(dq.isDirty());
|
||||||
assertFalse(dq.peekElements(1, 1, s -> true).isEmpty());
|
assertFalse(dq.peekElements(1, 1, s -> true).isEmpty());
|
||||||
|
Loading…
x
Reference in New Issue
Block a user