mirror of https://github.com/apache/lucene.git
SOLR-8152: Overseer Task Processor/Queue can miss responses, leading to timeouts
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1708539 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
33850419bd
commit
a87883cc5f
|
@ -266,6 +266,9 @@ Bug Fixes
|
|||
|
||||
* SOLR-8128: Set v.locale specified locale for all LocaleConfig extending VelocityResponseWriter tools.
|
||||
(Erik Hatcher)
|
||||
|
||||
* SOLR-8152: Overseer Task Processor/Queue can miss responses, leading to timeouts.
|
||||
(Gregory Chanan)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.zookeeper.CreateMode;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -91,6 +92,9 @@ public class OverseerTaskQueue extends DistributedQueue {
|
|||
+ path.substring(path.lastIndexOf("-") + 1);
|
||||
if (zookeeper.exists(responsePath, true)) {
|
||||
zookeeper.setData(responsePath, event.getBytes(), true);
|
||||
} else {
|
||||
LOG.info("Response ZK path: " + responsePath + " doesn't exist."
|
||||
+ " Requestor may have disconnected from ZooKeeper");
|
||||
}
|
||||
byte[] data = zookeeper.getData(path, null, null, true);
|
||||
zookeeper.delete(path, -1, true);
|
||||
|
@ -127,8 +131,8 @@ public class OverseerTaskQueue extends DistributedQueue {
|
|||
Event.EventType eventType = event.getType();
|
||||
// None events are ignored
|
||||
// If latchEventType is not null, only fire if the type matches
|
||||
LOG.info("{} fired on path {} state {} latchEventType {}", eventType, event.getPath(), event.getState(), latchEventType);
|
||||
if (eventType != Event.EventType.None && (latchEventType == null || eventType == latchEventType)) {
|
||||
LOG.info("{} fired on path {} state {}", eventType, event.getPath(), event.getState());
|
||||
synchronized (lock) {
|
||||
this.event = event;
|
||||
lock.notifyAll();
|
||||
|
@ -176,22 +180,31 @@ public class OverseerTaskQueue extends DistributedQueue {
|
|||
InterruptedException {
|
||||
TimerContext time = stats.time(dir + "_offer");
|
||||
try {
|
||||
String path = createData(dir + "/" + PREFIX, data,
|
||||
CreateMode.PERSISTENT_SEQUENTIAL);
|
||||
// Create and watch the response node before creating the request node;
|
||||
// otherwise we may miss the response.
|
||||
String watchID = createData(
|
||||
dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
|
||||
null, CreateMode.EPHEMERAL);
|
||||
dir + "/" + response_prefix,
|
||||
null, CreateMode.EPHEMERAL_SEQUENTIAL);
|
||||
|
||||
Object lock = new Object();
|
||||
LatchWatcher watcher = new LatchWatcher(lock);
|
||||
Stat stat = zookeeper.exists(watchID, watcher, true);
|
||||
|
||||
// create the request node
|
||||
createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
|
||||
data, CreateMode.PERSISTENT);
|
||||
|
||||
synchronized (lock) {
|
||||
if (zookeeper.exists(watchID, watcher, true) != null) {
|
||||
if (stat != null && watcher.getWatchedEvent() == null) {
|
||||
watcher.await(timeout);
|
||||
}
|
||||
}
|
||||
byte[] bytes = zookeeper.getData(watchID, null, null, true);
|
||||
// create the event before deleting the node, otherwise we can get the deleted
|
||||
// event from the watcher.
|
||||
QueueEvent event = new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
|
||||
zookeeper.delete(watchID, -1, true);
|
||||
return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
|
||||
return event;
|
||||
} finally {
|
||||
time.stop();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue