mirror of https://github.com/apache/lucene.git
SOLR-9191: OverseerTaskQueue.peekTopN() fatally flawed
This commit is contained in:
parent
6c0331b830
commit
ed2621de84
|
@ -46,7 +46,11 @@ Optimizations
|
||||||
(Ryan Zezeski, Mark Miller, Shawn Heisey, Steve Davids)
|
(Ryan Zezeski, Mark Miller, Shawn Heisey, Steve Davids)
|
||||||
|
|
||||||
================== 6.2.0 ==================
|
================== 6.2.0 ==================
|
||||||
(No Changes)
|
|
||||||
|
Bug Fixes
|
||||||
|
----------------------
|
||||||
|
|
||||||
|
* SOLR-9191: OverseerTaskQueue.peekTopN() fatally flawed (Scott Blum, Noble Paul)
|
||||||
|
|
||||||
================== 6.1.0 ==================
|
================== 6.1.0 ==================
|
||||||
|
|
||||||
|
|
|
@ -17,14 +17,15 @@
|
||||||
package org.apache.solr.cloud;
|
package org.apache.solr.cloud;
|
||||||
|
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.Collections;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
import java.util.SortedSet;
|
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
@ -32,6 +33,7 @@ import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.SolrException.ErrorCode;
|
import org.apache.solr.common.SolrException.ErrorCode;
|
||||||
import org.apache.solr.common.cloud.SolrZkClient;
|
import org.apache.solr.common.cloud.SolrZkClient;
|
||||||
import org.apache.solr.common.cloud.ZkCmdExecutor;
|
import org.apache.solr.common.cloud.ZkCmdExecutor;
|
||||||
|
import org.apache.solr.common.util.Pair;
|
||||||
import org.apache.solr.util.stats.TimerContext;
|
import org.apache.solr.util.stats.TimerContext;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -80,21 +82,15 @@ public class DistributedQueue {
|
||||||
private TreeSet<String> knownChildren = new TreeSet<>();
|
private TreeSet<String> knownChildren = new TreeSet<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to wait on a non-empty queue; you must hold {@link #updateLock} and verify that
|
* Used to wait on ZK changes to the child list; you must hold {@link #updateLock} before waiting on this condition.
|
||||||
* {@link #knownChildren} is empty before waiting on this condition.
|
|
||||||
*/
|
*/
|
||||||
private final Condition notEmpty = updateLock.newCondition();
|
private final Condition changed = updateLock.newCondition();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If non-null, the last watcher to listen for child changes.
|
* If non-null, the last watcher to listen for child changes. If null, the in-memory contents are dirty.
|
||||||
*/
|
*/
|
||||||
private ChildWatcher lastWatcher = null;
|
private ChildWatcher lastWatcher = null;
|
||||||
|
|
||||||
/**
|
|
||||||
* If true, ZK's child list probably doesn't match what's in memory.
|
|
||||||
*/
|
|
||||||
private boolean isDirty = true;
|
|
||||||
|
|
||||||
public DistributedQueue(SolrZkClient zookeeper, String dir) {
|
public DistributedQueue(SolrZkClient zookeeper, String dir) {
|
||||||
this(zookeeper, dir, new Overseer.Stats());
|
this(zookeeper, dir, new Overseer.Stats());
|
||||||
}
|
}
|
||||||
|
@ -165,7 +161,7 @@ public class DistributedQueue {
|
||||||
if (result != null) {
|
if (result != null) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
waitNanos = notEmpty.awaitNanos(waitNanos);
|
waitNanos = changed.awaitNanos(waitNanos);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -222,7 +218,7 @@ public class DistributedQueue {
|
||||||
if (result != null) {
|
if (result != null) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
notEmpty.await();
|
changed.await();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
updateLock.unlock();
|
updateLock.unlock();
|
||||||
|
@ -273,25 +269,19 @@ public class DistributedQueue {
|
||||||
private String firstChild(boolean remove) throws KeeperException, InterruptedException {
|
private String firstChild(boolean remove) throws KeeperException, InterruptedException {
|
||||||
updateLock.lockInterruptibly();
|
updateLock.lockInterruptibly();
|
||||||
try {
|
try {
|
||||||
// Try to fetch the first in-memory child.
|
// If we're not in a dirty state, and we have in-memory children, return from in-memory.
|
||||||
if (!knownChildren.isEmpty()) {
|
if (lastWatcher != null && !knownChildren.isEmpty()) {
|
||||||
return remove ? knownChildren.pollFirst() : knownChildren.first();
|
return remove ? knownChildren.pollFirst() : knownChildren.first();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lastWatcher != null && !isDirty) {
|
|
||||||
// No children, no known updates, and a watcher is already set; nothing we can do.
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to fetch an updated list of children from ZK.
|
// Try to fetch an updated list of children from ZK.
|
||||||
ChildWatcher newWatcher = new ChildWatcher();
|
ChildWatcher newWatcher = new ChildWatcher();
|
||||||
knownChildren = fetchZkChildren(newWatcher);
|
knownChildren = fetchZkChildren(newWatcher);
|
||||||
lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully
|
lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully
|
||||||
isDirty = false;
|
|
||||||
if (knownChildren.isEmpty()) {
|
if (knownChildren.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
notEmpty.signalAll();
|
changed.signalAll();
|
||||||
return remove ? knownChildren.pollFirst() : knownChildren.first();
|
return remove ? knownChildren.pollFirst() : knownChildren.first();
|
||||||
} finally {
|
} finally {
|
||||||
updateLock.unlock();
|
updateLock.unlock();
|
||||||
|
@ -325,26 +315,63 @@ public class DistributedQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the currently-known set of children from memory. If there are no children,
|
* Return the currently-known set of elements, using child names from memory. If no children are found, or no
|
||||||
* waits up to {@code waitMillis} for at least one child to become available. May
|
* children pass {@code acceptFilter}, waits up to {@code waitMillis} for at least one child to become available.
|
||||||
* update the set of known children.
|
* <p/>
|
||||||
|
* Package-private to support {@link OverseerTaskQueue} specifically.
|
||||||
*/
|
*/
|
||||||
SortedSet<String> getChildren(long waitMillis) throws KeeperException, InterruptedException {
|
Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Function<String, Boolean> acceptFilter) throws KeeperException, InterruptedException {
|
||||||
|
List<String> foundChildren = new ArrayList<>();
|
||||||
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
|
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
|
||||||
while (waitNanos > 0) {
|
while (true) {
|
||||||
// Trigger a fetch if needed.
|
// Trigger a fetch if needed.
|
||||||
firstElement();
|
firstChild(false);
|
||||||
|
|
||||||
updateLock.lockInterruptibly();
|
updateLock.lockInterruptibly();
|
||||||
try {
|
try {
|
||||||
if (!knownChildren.isEmpty()) {
|
for (String child : knownChildren) {
|
||||||
return new TreeSet<>(knownChildren);
|
if (acceptFilter.apply(child)) {
|
||||||
|
foundChildren.add(child);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
waitNanos = notEmpty.awaitNanos(waitNanos);
|
if (!foundChildren.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (waitNanos <= 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
waitNanos = changed.awaitNanos(waitNanos);
|
||||||
} finally {
|
} finally {
|
||||||
updateLock.unlock();
|
updateLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!foundChildren.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return Collections.emptySortedSet();
|
|
||||||
|
// Technically we could restart the method if we fail to actually obtain any valid children
|
||||||
|
// from ZK, but this is a super rare case, and the latency of the ZK fetches would require
|
||||||
|
// much more sophisticated waitNanos tracking.
|
||||||
|
List<Pair<String, byte[]>> result = new ArrayList<>();
|
||||||
|
for (String child : foundChildren) {
|
||||||
|
if (result.size() >= max) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
byte[] data = zookeeper.getData(dir + "/" + child, null, null, true);
|
||||||
|
result.add(new Pair<>(child, data));
|
||||||
|
} catch (KeeperException.NoNodeException e) {
|
||||||
|
// Another client deleted the node first, remove the in-memory and continue.
|
||||||
|
updateLock.lockInterruptibly();
|
||||||
|
try {
|
||||||
|
knownChildren.remove(child);
|
||||||
|
} finally {
|
||||||
|
updateLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -418,10 +445,8 @@ public class DistributedQueue {
|
||||||
if (lastWatcher == this) {
|
if (lastWatcher == this) {
|
||||||
lastWatcher = null;
|
lastWatcher = null;
|
||||||
}
|
}
|
||||||
// Do no updates in this thread, just signal state back to client threads.
|
|
||||||
isDirty = true;
|
|
||||||
// optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
|
// optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
|
||||||
notEmpty.signalAll();
|
changed.signalAll();
|
||||||
} finally {
|
} finally {
|
||||||
updateLock.unlock();
|
updateLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -190,9 +190,9 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
|
||||||
cleanUpWorkQueue();
|
cleanUpWorkQueue();
|
||||||
|
|
||||||
List<QueueEvent> heads = workQueue.peekTopN(MAX_PARALLEL_TASKS, runningZKTasks, 2000L);
|
List<QueueEvent> heads = workQueue.peekTopN(MAX_PARALLEL_TASKS, runningZKTasks, 2000L);
|
||||||
|
if (heads.isEmpty()) {
|
||||||
if (heads == null)
|
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());
|
log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());
|
||||||
|
|
||||||
|
@ -466,6 +466,8 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
|
||||||
log.warn("Could not find and remove async call [" + asyncId + "] from the running map.");
|
log.warn("Could not find and remove async call [" + asyncId + "] from the running map.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
workQueue.remove(head);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) {
|
private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) {
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
package org.apache.solr.cloud;
|
package org.apache.solr.cloud;
|
||||||
|
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -25,6 +24,7 @@ import java.util.TreeSet;
|
||||||
|
|
||||||
import org.apache.solr.common.cloud.SolrZkClient;
|
import org.apache.solr.common.cloud.SolrZkClient;
|
||||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
|
import org.apache.solr.common.util.Pair;
|
||||||
import org.apache.solr.util.stats.TimerContext;
|
import org.apache.solr.util.stats.TimerContext;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -82,9 +82,8 @@ public class OverseerTaskQueue extends DistributedQueue {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove the event and save the response into the other path.
|
* Remove the event and save the response into the other path.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public byte[] remove(QueueEvent event) throws KeeperException,
|
public void remove(QueueEvent event) throws KeeperException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
TimerContext time = stats.time(dir + "_remove_event");
|
TimerContext time = stats.time(dir + "_remove_event");
|
||||||
try {
|
try {
|
||||||
|
@ -97,9 +96,10 @@ public class OverseerTaskQueue extends DistributedQueue {
|
||||||
LOG.info("Response ZK path: " + responsePath + " doesn't exist."
|
LOG.info("Response ZK path: " + responsePath + " doesn't exist."
|
||||||
+ " Requestor may have disconnected from ZooKeeper");
|
+ " Requestor may have disconnected from ZooKeeper");
|
||||||
}
|
}
|
||||||
byte[] data = zookeeper.getData(path, null, null, true);
|
try {
|
||||||
zookeeper.delete(path, -1, true);
|
zookeeper.delete(path, -1, true);
|
||||||
return data;
|
} catch (KeeperException.NoNodeException ignored) {
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
time.stop();
|
time.stop();
|
||||||
}
|
}
|
||||||
|
@ -227,44 +227,26 @@ public class OverseerTaskQueue extends DistributedQueue {
|
||||||
ArrayList<QueueEvent> topN = new ArrayList<>();
|
ArrayList<QueueEvent> topN = new ArrayList<>();
|
||||||
|
|
||||||
LOG.debug("Peeking for top {} elements. ExcludeSet: {}", n, excludeSet);
|
LOG.debug("Peeking for top {} elements. ExcludeSet: {}", n, excludeSet);
|
||||||
TimerContext time = null;
|
TimerContext time;
|
||||||
if (waitMillis == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
|
if (waitMillis == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
|
||||||
else time = stats.time(dir + "_peekTopN_wait" + waitMillis);
|
else time = stats.time(dir + "_peekTopN_wait" + waitMillis);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (String headNode : getChildren(waitMillis)) {
|
for (Pair<String, byte[]> element : peekElements(n, waitMillis, child -> !excludeSet.contains(dir + "/" + child))) {
|
||||||
if (topN.size() < n) {
|
topN.add(new QueueEvent(dir + "/" + element.first(),
|
||||||
try {
|
element.second(), null));
|
||||||
String id = dir + "/" + headNode;
|
|
||||||
if (excludeSet.contains(id)) continue;
|
|
||||||
QueueEvent queueEvent = new QueueEvent(id,
|
|
||||||
zookeeper.getData(dir + "/" + headNode, null, null, true), null);
|
|
||||||
topN.add(queueEvent);
|
|
||||||
} catch (KeeperException.NoNodeException e) {
|
|
||||||
// Another client removed the node first, try next
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (topN.size() >= 1) {
|
|
||||||
printQueueEventsListElementIds(topN);
|
|
||||||
return topN;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
printQueueEventsListElementIds(topN);
|
||||||
if (topN.size() > 0 ) {
|
return topN;
|
||||||
printQueueEventsListElementIds(topN);
|
|
||||||
return topN;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
} finally {
|
} finally {
|
||||||
time.stop();
|
time.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
|
private static void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
|
||||||
if(LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled() && !topN.isEmpty()) {
|
||||||
StringBuffer sb = new StringBuffer("[");
|
StringBuilder sb = new StringBuilder("[");
|
||||||
for(QueueEvent queueEvent: topN) {
|
for (QueueEvent queueEvent : topN) {
|
||||||
sb.append(queueEvent.getId()).append(", ");
|
sb.append(queueEvent.getId()).append(", ");
|
||||||
}
|
}
|
||||||
sb.append("]");
|
sb.append("]");
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.solr.cloud;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
@ -137,6 +136,49 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
|
||||||
assertNull(dq.poll());
|
assertNull(dq.poll());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPeekElements() throws Exception {
|
||||||
|
String dqZNode = "/distqueue/test";
|
||||||
|
byte[] data = "hello world".getBytes(UTF8);
|
||||||
|
|
||||||
|
DistributedQueue dq = makeDistributedQueue(dqZNode);
|
||||||
|
|
||||||
|
// Populate with data.
|
||||||
|
dq.offer(data);
|
||||||
|
dq.offer(data);
|
||||||
|
dq.offer(data);
|
||||||
|
|
||||||
|
// Should be able to get 0, 1, 2, or 3 instantly
|
||||||
|
for (int i = 0; i <= 3; ++i) {
|
||||||
|
assertEquals(i, dq.peekElements(i, 0, child -> true).size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Asking for more should return only 3.
|
||||||
|
assertEquals(3, dq.peekElements(4, 0, child -> true).size());
|
||||||
|
|
||||||
|
// If we filter everything out, we should block for the full time.
|
||||||
|
long start = System.nanoTime();
|
||||||
|
assertEquals(0, dq.peekElements(4, 1000, child -> false).size());
|
||||||
|
assertTrue(System.nanoTime() - start >= TimeUnit.MILLISECONDS.toNanos(500));
|
||||||
|
|
||||||
|
// If someone adds a new matching element while we're waiting, we should return immediately.
|
||||||
|
executor.submit(() -> {
|
||||||
|
try {
|
||||||
|
Thread.sleep(500);
|
||||||
|
dq.offer(data);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
});
|
||||||
|
start = System.nanoTime();
|
||||||
|
assertEquals(1, dq.peekElements(4, 2000, child -> {
|
||||||
|
// The 4th element in the queue will end with a "3".
|
||||||
|
return child.endsWith("3");
|
||||||
|
}).size());
|
||||||
|
assertTrue(System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(1000));
|
||||||
|
assertTrue(System.nanoTime() - start >= TimeUnit.MILLISECONDS.toNanos(250));
|
||||||
|
}
|
||||||
|
|
||||||
private void forceSessionExpire() throws InterruptedException, TimeoutException {
|
private void forceSessionExpire() throws InterruptedException, TimeoutException {
|
||||||
long sessionId = zkClient.getSolrZooKeeper().getSessionId();
|
long sessionId = zkClient.getSolrZooKeeper().getSessionId();
|
||||||
zkServer.expire(sessionId);
|
zkServer.expire(sessionId);
|
||||||
|
|
Loading…
Reference in New Issue