SOLR-6760: New optimized DistributedQueue implementation for overseer

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1696706 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2015-08-20 05:12:22 +00:00
parent e41d6588f5
commit 98f8e9fe2e
12 changed files with 788 additions and 555 deletions

View File

@ -151,6 +151,10 @@ Optimizations
is anywhere from 20% to over 100% faster and produces less garbage on average.
(yonik)
* SOLR-6760: New optimized DistributedQueue implementation for overseer increases
message processing performance by ~470%.
(Noble Paul, Scott Blum, shalin)
Other Changes
----------------------

View File

@ -18,19 +18,21 @@
package org.apache.solr.cloud;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@ -40,23 +42,60 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A distributed queue from zk recipes.
* A distributed queue.
*/
public class DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);
private static long DEFAULT_TIMEOUT = 5*60*1000;
private final String dir;
private SolrZkClient zookeeper;
private final String prefix = "qn-";
private final String response_prefix = "qnr-" ;
private final Overseer.Stats stats;
static final String PREFIX = "qn-";
/**
* Theory of operation:
* <p>
* Under ordinary circumstances we neither watch nor poll for children in ZK.
* Instead we keep an in-memory list of known child names. When the in-memory
* list is exhausted, we then fetch from ZK.
* <p>
* We only bother setting a child watcher when the queue has no children in ZK.
*/
private static final Object _IMPLEMENTATION_NOTES = null;
final String dir;
final SolrZkClient zookeeper;
final Overseer.Stats stats;
/**
* A lock that guards all of the mutable state that follows.
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* Contains the last set of children fetched from ZK. Elements are removed from the head of
* this in-memory set as they are consumed from the queue. Due to the distributed nature
* of the queue, elements may appear in this set whose underlying nodes have been consumed in ZK.
* Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
* like {@link #poll()} must resolve any races by attempting to delete the underlying node.
*/
private TreeSet<String> knownChildren = new TreeSet<>();
/**
* Used to wait on a non-empty queue; you must hold {@link #updateLock} and verify that
* {@link #knownChildren} is empty before waiting on this condition.
*/
private final Condition notEmpty = updateLock.newCondition();
/**
* If non-null, the last watcher to listen for child changes.
*/
private ChildWatcher lastWatcher = null;
/**
* If true, ZK's child list probably doesn't match what's in memory.
*/
private boolean isDirty = true;
public DistributedQueue(SolrZkClient zookeeper, String dir) {
this(zookeeper, dir, new Overseer.Stats());
}
@ -77,548 +116,146 @@ public class DistributedQueue {
this.zookeeper = zookeeper;
this.stats = stats;
}
/**
* Returns a Map of the children, ordered by id.
*
* @param watcher
* optional watcher on getChildren() operation.
* @return map from id to child name for all children
*/
private TreeMap<Long,String> orderedChildren(Watcher watcher)
throws KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren = new TreeMap<>();
List<String> childNames = zookeeper.getChildren(dir, watcher, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
try {
// Check format
if (!childName.regionMatches(0, prefix, 0, prefix.length())) {
LOG.debug("Found child node with improper name: " + childName);
continue;
}
String suffix = childName.substring(prefix.length());
Long childId = new Long(suffix);
orderedChildren.put(childId, childName);
} catch (NumberFormatException e) {
LOG.warn("Found child node with improper format : " + childName + " "
+ e, e);
}
}
return orderedChildren;
}
/**
* Returns true if the queue contains a task with the specified async id.
*/
public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {
List<String> childNames = zookeeper.getChildren(dir, null, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
if (childName != null) {
try {
byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
if (data != null) {
ZkNodeProps message = ZkNodeProps.load(data);
if (message.containsKey(requestIdKey)) {
LOG.debug(">>>> {}", message.get(requestIdKey));
if(message.get(requestIdKey).equals(requestId)) return true;
}
}
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
}
}
return false;
}
/**
* Return the head of the queue without modifying the queue.
*
* @return the data at the head of the queue.
*/
private QueueEvent element() throws KeeperException,
InterruptedException {
TreeMap<Long,String> orderedChildren;
// element, take, and remove follow the same pattern.
// We want to return the child node with the smallest sequence number.
// Since other clients are remove()ing and take()ing nodes concurrently,
// the child with the smallest sequence number in orderedChildren might be
// gone by the time we check.
// We don't call getChildren again until we have tried the rest of the nodes
// in sequence order.
while (true) {
try {
orderedChildren = orderedChildren(null);
} catch (KeeperException.NoNodeException e) {
return null;
}
if (orderedChildren.size() == 0) return null;
for (String headNode : orderedChildren.values()) {
if (headNode != null) {
try {
return new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode, null, null, true), null);
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
}
}
}
}
/**
* Attempts to remove the head of the queue and return it.
*
* @return The former head of the queue
*/
public byte[] remove() throws NoSuchElementException, KeeperException,
InterruptedException {
TreeMap<Long,String> orderedChildren;
// Same as for element. Should refactor this.
TimerContext time = stats.time(dir + "_remove");
try {
while (true) {
try {
orderedChildren = orderedChildren(null);
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException();
}
if (orderedChildren.size() == 0) throw new NoSuchElementException();
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return data;
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
}
}
}
} finally {
time.stop();
}
}
/**
* Remove the event and save the response into the other path.
*
*/
public byte[] remove(QueueEvent event) throws KeeperException,
InterruptedException {
TimerContext time = stats.time(dir + "_remove_event");
try {
String path = event.getId();
String responsePath = dir + "/" + response_prefix
+ path.substring(path.lastIndexOf("-") + 1);
if (zookeeper.exists(responsePath, true)) {
zookeeper.setData(responsePath, event.getBytes(), true);
}
byte[] data = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return data;
} finally {
time.stop();
}
}
/**
* Watcher that blocks until a WatchedEvent occurs for a znode.
*/
private final class LatchWatcher implements Watcher {
private final Object lock;
private WatchedEvent event;
private Event.EventType latchEventType;
LatchWatcher(Object lock) {
this(lock, null);
}
LatchWatcher(Event.EventType eventType) {
this(new Object(), eventType);
}
LatchWatcher(Object lock, Event.EventType eventType) {
this.lock = lock;
this.latchEventType = eventType;
}
@Override
public void process(WatchedEvent event) {
Event.EventType eventType = event.getType();
// None events are ignored
// If latchEventType is not null, only fire if the type matches
if (eventType != Event.EventType.None && (latchEventType == null || eventType == latchEventType)) {
LOG.info("{} fired on path {} state {}", eventType, event.getPath(), event.getState());
synchronized (lock) {
this.event = event;
lock.notifyAll();
}
}
}
public void await(long timeout) throws InterruptedException {
synchronized (lock) {
if (this.event != null) return;
lock.wait(timeout);
}
}
public WatchedEvent getWatchedEvent() {
return event;
}
}
// we avoid creating *many* watches in some cases
// by saving the childrenWatcher and the children associated - see SOLR-6336
private LatchWatcher childrenWatcher;
private TreeMap<Long,String> fetchedChildren;
private final Object childrenWatcherLock = new Object();
private Map<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
{
LatchWatcher watcher;
TreeMap<Long,String> children;
synchronized (childrenWatcherLock) {
watcher = childrenWatcher;
children = fetchedChildren;
}
if (watcher == null || watcher.getWatchedEvent() != null) {
// this watcher is only interested in child change events
watcher = new LatchWatcher(Watcher.Event.EventType.NodeChildrenChanged);
while (true) {
try {
children = orderedChildren(watcher);
break;
} catch (KeeperException.NoNodeException e) {
zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
// go back to the loop and try again
}
}
synchronized (childrenWatcherLock) {
childrenWatcher = watcher;
fetchedChildren = children;
}
}
while (true) {
if (!children.isEmpty()) break;
watcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
if (watcher.getWatchedEvent() != null) {
children = orderedChildren(null);
}
if (wait != Long.MAX_VALUE) break;
}
return Collections.unmodifiableMap(children);
}
/**
* Removes the head of the queue and returns it, blocks until it succeeds.
*
* @return The former head of the queue
*/
public byte[] take() throws KeeperException, InterruptedException {
// Same as for element. Should refactor this.
TimerContext timer = stats.time(dir + "_take");
try {
Map<Long, String> orderedChildren = getChildren(Long.MAX_VALUE);
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return data;
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
}
}
return null; // shouldn't really reach here..
} finally {
timer.stop();
}
}
/**
* Inserts data into queue.
*
* @return true if data was successfully added
*/
public boolean offer(byte[] data) throws KeeperException,
InterruptedException {
TimerContext time = stats.time(dir + "_offer");
try {
return createData(dir + "/" + prefix, data,
CreateMode.PERSISTENT_SEQUENTIAL) != null;
} finally {
time.stop();
}
}
/**
* Inserts data into zookeeper.
*
* @return true if data was successfully added
*/
private String createData(String path, byte[] data, CreateMode mode)
throws KeeperException, InterruptedException {
for (;;) {
try {
return zookeeper.create(path, data, mode, true);
} catch (KeeperException.NoNodeException e) {
try {
zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException ne) {
// someone created it
}
}
}
}
/**
* Offer the data and wait for the response
*
*/
public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
InterruptedException {
TimerContext time = stats.time(dir + "_offer");
try {
String path = createData(dir + "/" + prefix, data,
CreateMode.PERSISTENT_SEQUENTIAL);
String watchID = createData(
dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
null, CreateMode.EPHEMERAL);
Object lock = new Object();
LatchWatcher watcher = new LatchWatcher(lock);
synchronized (lock) {
if (zookeeper.exists(watchID, watcher, true) != null) {
watcher.await(timeout);
}
}
byte[] bytes = zookeeper.getData(watchID, null, null, true);
zookeeper.delete(watchID, -1, true);
return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
} finally {
time.stop();
}
}
/**
* Returns the data at the first element of the queue, or null if the queue is
* empty.
*
*
* @return data at the first element of the queue, or null.
*/
public byte[] peek() throws KeeperException, InterruptedException {
TimerContext time = stats.time(dir + "_peek");
try {
QueueEvent element = element();
if (element == null) return null;
return element.getBytes();
} finally {
time.stop();
}
}
public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, Long wait)
throws KeeperException, InterruptedException {
ArrayList<QueueEvent> topN = new ArrayList<>();
LOG.debug("Peeking for top {} elements. ExcludeSet: " + excludeSet.toString());
TimerContext time = null;
if (wait == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
else time = stats.time(dir + "_peekTopN_wait" + wait);
try {
Map<Long, String> orderedChildren = getChildren(wait);
for (String headNode : orderedChildren.values()) {
if (headNode != null && topN.size() < n) {
try {
String id = dir + "/" + headNode;
if (excludeSet != null && excludeSet.contains(id)) continue;
QueueEvent queueEvent = new QueueEvent(id,
zookeeper.getData(dir + "/" + headNode, null, null, true), null);
topN.add(queueEvent);
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
} else {
if (topN.size() >= 1) {
printQueueEventsListElementIds(topN);
return topN;
}
}
}
if (topN.size() > 0 ) {
printQueueEventsListElementIds(topN);
return topN;
}
return null;
return firstElement();
} finally {
time.stop();
}
}
private void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
if(LOG.isDebugEnabled()) {
StringBuffer sb = new StringBuffer("[");
for(QueueEvent queueEvent: topN) {
sb.append(queueEvent.getId()).append(", ");
}
sb.append("]");
LOG.debug("Returning topN elements: {}", sb.toString());
}
}
/**
*
* Gets last element of the Queue without removing it.
*/
public String getTailId() throws KeeperException, InterruptedException {
TreeMap<Long, String> orderedChildren = null;
orderedChildren = orderedChildren(null);
if(orderedChildren == null || orderedChildren.isEmpty()) return null;
for(String headNode : orderedChildren.descendingMap().values())
if (headNode != null) {
try {
QueueEvent queueEvent = new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode,
null, null, true), null);
return queueEvent.getId();
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
}
return null;
}
public static class QueueEvent {
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
QueueEvent other = (QueueEvent) obj;
if (id == null) {
if (other.id != null) return false;
} else if (!id.equals(other.id)) return false;
return true;
}
private WatchedEvent event = null;
private String id;
private byte[] bytes;
QueueEvent(String id, byte[] bytes, WatchedEvent event) {
this.id = id;
this.bytes = bytes;
this.event = event;
}
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
public byte[] getBytes() {
return bytes;
}
public WatchedEvent getWatchedEvent() {
return event;
}
}
/**
* Returns the data at the first element of the queue, or null if the queue is
* empty and block is false.
*
*
* @param block if true, blocks until an element enters the queue
* @return data at the first element of the queue, or null.
*/
public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
return peek(block ? Long.MAX_VALUE : 0);
public byte[] peek(boolean block) throws KeeperException, InterruptedException {
return block ? peek(Long.MAX_VALUE) : peek();
}
/**
* Returns the data at the first element of the queue, or null if the queue is
* empty after wait ms.
*
*
* @param wait max wait time in ms.
* @return data at the first element of the queue, or null.
*/
public QueueEvent peek(long wait) throws KeeperException, InterruptedException {
TimerContext time = null;
public byte[] peek(long wait) throws KeeperException, InterruptedException {
Preconditions.checkArgument(wait > 0);
TimerContext time;
if (wait == Long.MAX_VALUE) {
time = stats.time(dir + "_peek_wait_forever");
} else {
time = stats.time(dir + "_peek_wait" + wait);
}
updateLock.lockInterruptibly();
try {
if (wait == 0) {
return element();
}
Map<Long, String> orderedChildren = getChildren(wait);
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, null, null, true);
return new QueueEvent(path, data, null);
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
while (waitNanos > 0) {
byte[] result = firstElement();
if (result != null) {
return result;
}
waitNanos = notEmpty.awaitNanos(waitNanos);
}
return null;
} finally {
updateLock.unlock();
time.stop();
}
}
/**
* Attempts to remove the head of the queue and return it. Returns null if the
* queue is empty.
*
*
* @return Head of the queue or null.
*/
public byte[] poll() throws KeeperException, InterruptedException {
TimerContext time = stats.time(dir + "_poll");
try {
return remove();
} catch (NoSuchElementException e) {
return null;
return removeFirst();
} finally {
time.stop();
}
}
/**
* Attempts to remove the head of the queue and return it.
*
* @return The former head of the queue
*/
public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
TimerContext time = stats.time(dir + "_remove");
try {
byte[] result = removeFirst();
if (result == null) {
throw new NoSuchElementException();
}
return result;
} finally {
time.stop();
}
}
/**
* Removes the head of the queue and returns it, blocks until it succeeds.
*
* @return The former head of the queue
*/
public byte[] take() throws KeeperException, InterruptedException {
// Same as for element. Should refactor this.
TimerContext timer = stats.time(dir + "_take");
updateLock.lockInterruptibly();
try {
while (true) {
byte[] result = removeFirst();
if (result != null) {
return result;
}
notEmpty.await();
}
} finally {
updateLock.unlock();
timer.stop();
}
}
/**
* Inserts data into queue. Successfully calling this method does NOT guarantee
* that the element will be immediately available in the in-memory queue. In particular,
* calling this method on an empty queue will not necessarily cause {@link #poll()} to
* return the offered element. Use a blocking method if you must wait for the offered
* element to become visible.
*/
public void offer(byte[] data) throws KeeperException, InterruptedException {
TimerContext time = stats.time(dir + "_offer");
try {
while (true) {
try {
// We don't need to explicitly set isDirty here; if there is a watcher, it will
// see the update and set the bit itself; if there is no watcher we can defer
// the update anyway.
zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
return;
} catch (KeeperException.NoNodeException e) {
try {
zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException ne) {
// someone created it
}
}
}
} finally {
time.stop();
}
@ -627,4 +264,164 @@ public class DistributedQueue {
public Overseer.Stats getStats() {
return stats;
}
/**
* Returns the name if the first known child node, or {@code null} if the queue is empty.
* This is the only place {@link #knownChildren} is ever updated!
* The caller must double check that the actual node still exists, since the in-memory
* list is inherently stale.
*/
private String firstChild(boolean remove) throws KeeperException, InterruptedException {
updateLock.lockInterruptibly();
try {
// Try to fetch the first in-memory child.
if (!knownChildren.isEmpty()) {
return remove ? knownChildren.pollFirst() : knownChildren.first();
}
if (lastWatcher != null && !isDirty) {
// No children, no known updates, and a watcher is already set; nothing we can do.
return null;
}
// Try to fetch an updated list of children from ZK.
ChildWatcher newWatcher = new ChildWatcher();
knownChildren = fetchZkChildren(newWatcher);
lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully
isDirty = false;
if (knownChildren.isEmpty()) {
return null;
}
notEmpty.signalAll();
return remove ? knownChildren.pollFirst() : knownChildren.first();
} finally {
updateLock.unlock();
}
}
/**
* Return the current set of children from ZK; does not change internal state.
*/
TreeSet<String> fetchZkChildren(Watcher watcher) throws InterruptedException, KeeperException {
while (true) {
try {
TreeSet<String> orderedChildren = new TreeSet<>();
List<String> childNames = zookeeper.getChildren(dir, watcher, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
// Check format
if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
LOG.debug("Found child node with improper name: " + childName);
continue;
}
orderedChildren.add(childName);
}
return orderedChildren;
} catch (KeeperException.NoNodeException e) {
zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
// go back to the loop and try again
}
}
}
/**
* Return the currently-known set of children from memory. If there are no children,
* waits up to {@code waitMillis} for at least one child to become available. May
* update the set of known children.
*/
SortedSet<String> getChildren(long waitMillis) throws KeeperException, InterruptedException {
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
while (waitNanos > 0) {
// Trigger a fetch if needed.
firstElement();
updateLock.lockInterruptibly();
try {
if (!knownChildren.isEmpty()) {
return new TreeSet<>(knownChildren);
}
waitNanos = notEmpty.awaitNanos(waitNanos);
} finally {
updateLock.unlock();
}
}
return Collections.emptySortedSet();
}
/**
* Return the head of the queue without modifying the queue.
*
* @return the data at the head of the queue.
*/
private byte[] firstElement() throws KeeperException, InterruptedException {
while (true) {
String firstChild = firstChild(false);
if (firstChild == null) {
return null;
}
try {
return zookeeper.getData(dir + "/" + firstChild, null, null, true);
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first, remove the in-memory and retry.
updateLock.lockInterruptibly();
try {
knownChildren.remove(firstChild);
} finally {
updateLock.unlock();
}
}
}
}
private byte[] removeFirst() throws KeeperException, InterruptedException {
while (true) {
String firstChild = firstChild(true);
if (firstChild == null) {
return null;
}
try {
String path = dir + "/" + firstChild;
byte[] result = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return result;
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first, remove the in-memory and retry.
updateLock.lockInterruptibly();
try {
knownChildren.remove(firstChild);
} finally {
updateLock.unlock();
}
}
}
}
@VisibleForTesting boolean hasWatcher() throws InterruptedException {
updateLock.lockInterruptibly();
try {
return lastWatcher != null;
} finally {
updateLock.unlock();
}
}
private class ChildWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
updateLock.lock();
try {
// this watcher is automatically cleared when fired
if (lastWatcher == this) {
lastWatcher = null;
}
// Do no updates in this thread, just signal state back to client threads.
isDirty = true;
// optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
notEmpty.signalAll();
} finally {
updateLock.unlock();
}
}
}
}

View File

@ -188,7 +188,7 @@ public class Overseer implements Closeable {
}
}
DistributedQueue.QueueEvent head = null;
byte[] head = null;
try {
head = stateUpdateQueue.peek(true);
} catch (KeeperException e) {
@ -207,8 +207,8 @@ public class Overseer implements Closeable {
}
try {
while (head != null) {
final byte[] data = head.getBytes();
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
byte[] data = head;
final ZkNodeProps message = ZkNodeProps.load(data);
log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
// we can batch here because workQueue is our fallback in case a ZK write failed
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
@ -922,13 +922,13 @@ public class Overseer implements Closeable {
}
/* Collection creation queue */
static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
static OverseerCollectionQueue getCollectionQueue(final SolrZkClient zkClient) {
return getCollectionQueue(zkClient, new Stats());
}
static DistributedQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
static OverseerCollectionQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
return new DistributedQueue(zkClient, "/overseer/collection-queue-work", zkStats);
return new OverseerCollectionQueue(zkClient, "/overseer/collection-queue-work", zkStats);
}
private static void createOverseerNode(final SolrZkClient zkClient) {

View File

@ -53,7 +53,7 @@ public class OverseerCollectionProcessor extends OverseerProcessor {
Overseer.Stats stats,
Overseer overseer,
OverseerNodePrioritizer overseerNodePrioritizer,
DistributedQueue workQueue,
OverseerCollectionQueue workQueue,
DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {

View File

@ -0,0 +1,324 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link DistributedQueue} augmented with helper methods specific to the collection queue.
* Methods specific to this subclass ignore superclass internal state and hit ZK directly.
* This is inefficient! But the API on this class is kind of muddy..
*/
public class OverseerCollectionQueue extends DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(OverseerCollectionQueue.class);
private final String response_prefix = "qnr-" ;
public OverseerCollectionQueue(SolrZkClient zookeeper, String dir) {
this(zookeeper, dir, new Overseer.Stats());
}
public OverseerCollectionQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) {
super(zookeeper, dir, stats);
}
/**
* Returns true if the queue contains a task with the specified async id.
*/
public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {
List<String> childNames = zookeeper.getChildren(dir, null, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
if (childName != null) {
try {
byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
if (data != null) {
ZkNodeProps message = ZkNodeProps.load(data);
if (message.containsKey(requestIdKey)) {
LOG.debug(">>>> {}", message.get(requestIdKey));
if(message.get(requestIdKey).equals(requestId)) return true;
}
}
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
}
}
return false;
}
/**
* Remove the event and save the response into the other path.
*
*/
public byte[] remove(QueueEvent event) throws KeeperException,
InterruptedException {
TimerContext time = stats.time(dir + "_remove_event");
try {
String path = event.getId();
String responsePath = dir + "/" + response_prefix
+ path.substring(path.lastIndexOf("-") + 1);
if (zookeeper.exists(responsePath, true)) {
zookeeper.setData(responsePath, event.getBytes(), true);
}
byte[] data = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return data;
} finally {
time.stop();
}
}
/**
* Watcher that blocks until a WatchedEvent occurs for a znode.
*/
private final class LatchWatcher implements Watcher {
private final Object lock;
private WatchedEvent event;
private Event.EventType latchEventType;
LatchWatcher(Object lock) {
this(lock, null);
}
LatchWatcher(Event.EventType eventType) {
this(new Object(), eventType);
}
LatchWatcher(Object lock, Event.EventType eventType) {
this.lock = lock;
this.latchEventType = eventType;
}
@Override
public void process(WatchedEvent event) {
Event.EventType eventType = event.getType();
// None events are ignored
// If latchEventType is not null, only fire if the type matches
if (eventType != Event.EventType.None && (latchEventType == null || eventType == latchEventType)) {
LOG.info("{} fired on path {} state {}", eventType, event.getPath(), event.getState());
synchronized (lock) {
this.event = event;
lock.notifyAll();
}
}
}
public void await(long timeout) throws InterruptedException {
synchronized (lock) {
if (this.event != null) return;
lock.wait(timeout);
}
}
public WatchedEvent getWatchedEvent() {
return event;
}
}
/**
* Inserts data into zookeeper.
*
* @return true if data was successfully added
*/
private String createData(String path, byte[] data, CreateMode mode)
throws KeeperException, InterruptedException {
for (;;) {
try {
return zookeeper.create(path, data, mode, true);
} catch (KeeperException.NoNodeException e) {
try {
zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException ne) {
// someone created it
}
}
}
}
/**
* Offer the data and wait for the response
*
*/
public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
InterruptedException {
TimerContext time = stats.time(dir + "_offer");
try {
String path = createData(dir + "/" + PREFIX, data,
CreateMode.PERSISTENT_SEQUENTIAL);
String watchID = createData(
dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
null, CreateMode.EPHEMERAL);
Object lock = new Object();
LatchWatcher watcher = new LatchWatcher(lock);
synchronized (lock) {
if (zookeeper.exists(watchID, watcher, true) != null) {
watcher.await(timeout);
}
}
byte[] bytes = zookeeper.getData(watchID, null, null, true);
zookeeper.delete(watchID, -1, true);
return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
} finally {
time.stop();
}
}
public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, long waitMillis)
throws KeeperException, InterruptedException {
ArrayList<QueueEvent> topN = new ArrayList<>();
LOG.debug("Peeking for top {} elements. ExcludeSet: {}", n, excludeSet);
TimerContext time = null;
if (waitMillis == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
else time = stats.time(dir + "_peekTopN_wait" + waitMillis);
try {
for (String headNode : getChildren(waitMillis)) {
if (topN.size() < n) {
try {
String id = dir + "/" + headNode;
if (excludeSet.contains(id)) continue;
QueueEvent queueEvent = new QueueEvent(id,
zookeeper.getData(dir + "/" + headNode, null, null, true), null);
topN.add(queueEvent);
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
} else {
if (topN.size() >= 1) {
printQueueEventsListElementIds(topN);
return topN;
}
}
}
if (topN.size() > 0 ) {
printQueueEventsListElementIds(topN);
return topN;
}
return null;
} finally {
time.stop();
}
}
private static void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
if(LOG.isDebugEnabled()) {
StringBuffer sb = new StringBuffer("[");
for(QueueEvent queueEvent: topN) {
sb.append(queueEvent.getId()).append(", ");
}
sb.append("]");
LOG.debug("Returning topN elements: {}", sb.toString());
}
}
/**
*
* Gets last element of the Queue without removing it.
*/
public String getTailId() throws KeeperException, InterruptedException {
// TODO: could we use getChildren here? Unsure what freshness guarantee the caller needs.
TreeSet<String> orderedChildren = fetchZkChildren(null);
for (String headNode : orderedChildren.descendingSet())
if (headNode != null) {
try {
QueueEvent queueEvent = new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode,
null, null, true), null);
return queueEvent.getId();
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
}
return null;
}
public static class QueueEvent {
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
QueueEvent other = (QueueEvent) obj;
if (id == null) {
if (other.id != null) return false;
} else if (!id.equals(other.id)) return false;
return true;
}
private WatchedEvent event = null;
private String id;
private byte[] bytes;
QueueEvent(String id, byte[] bytes, WatchedEvent event) {
this.id = id;
this.bytes = bytes;
this.event = event;
}
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
public byte[] getBytes() {
return bytes;
}
public WatchedEvent getWatchedEvent() {
return event;
}
}
}

View File

@ -29,9 +29,8 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@ -66,7 +65,7 @@ public class OverseerProcessor implements Runnable, Closeable {
private static Logger log = LoggerFactory
.getLogger(OverseerProcessor.class);
private DistributedQueue workQueue;
private OverseerCollectionQueue workQueue;
private DistributedMap runningMap;
private DistributedMap completedMap;
private DistributedMap failureMap;
@ -105,7 +104,7 @@ public class OverseerProcessor implements Runnable, Closeable {
Overseer.Stats stats,
OverseerMessageHandlerSelector selector,
OverseerNodePrioritizer prioritizer,
DistributedQueue workQueue,
OverseerCollectionQueue workQueue,
DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {

View File

@ -116,7 +116,7 @@ public final class ZkController {
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
private final DistributedQueue overseerJobQueue;
private final DistributedQueue overseerCollectionQueue;
private final OverseerCollectionQueue overseerCollectionQueue;
private final DistributedMap overseerRunningMap;
private final DistributedMap overseerCompletedMap;
@ -1767,7 +1767,7 @@ public final class ZkController {
return overseerJobQueue;
}
public DistributedQueue getOverseerCollectionQueue() {
public OverseerCollectionQueue getOverseerCollectionQueue() {
return overseerCollectionQueue;
}

View File

@ -36,8 +36,8 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.DistributedMap;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.OverseerCollectionQueue;
import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.overseer.SliceMutator;
@ -252,7 +252,7 @@ public class CollectionsHandler extends RequestHandlerBase {
}
private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException {
DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
OverseerCollectionQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
return collectionQueue.containsTaskWithRequestId(ASYNC, asyncId);
}

View File

@ -21,8 +21,8 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.LeaderInitiatedRecoveryThread;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.overseer.OverseerAction;

View File

@ -16,13 +16,18 @@ package org.apache.solr.cloud;
* the License.
*/
import java.io.File;
import java.nio.charset.Charset;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -33,6 +38,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
protected ZkTestServer zkServer;
protected SolrZkClient zkClient;
protected ExecutorService executor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("dqtest-"));
@Before
@Override
@ -44,37 +50,110 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
@Test
public void testDistributedQueue() throws Exception {
String dqZNode = "/distqueue/test";
String testData = "hello world";
long timeoutMs = 500L;
byte[] data = "hello world".getBytes(UTF8);
DistributedQueue dq = new DistributedQueue(zkClient, setupDistributedQueueZNode(dqZNode));
DistributedQueue dq = makeDistributedQueue(dqZNode);
// basic ops
assertTrue(dq.poll() == null);
byte[] data = testData.getBytes(UTF8);
assertNull(dq.poll());
try {
dq.remove();
fail("NoSuchElementException expected");
} catch (NoSuchElementException expected) {
// expected
}
dq.offer(data);
assertEquals(new String(dq.peek(),UTF8), testData);
assertEquals(new String(dq.take(),UTF8), testData);
assertTrue(dq.poll() == null);
QueueEvent qe = dq.offer(data, timeoutMs);
assertNotNull(qe);
assertEquals(new String(dq.remove(),UTF8), testData);
assertArrayEquals(dq.peek(500), data);
assertArrayEquals(dq.remove(), data);
assertNull(dq.poll());
dq.offer(data);
assertArrayEquals(dq.take(), data); // waits for data
assertNull(dq.poll());
dq.offer(data);
dq.peek(true); // wait until data is definitely there before calling remove
assertArrayEquals(dq.remove(), data);
assertNull(dq.poll());
// should block until the background thread makes the offer
(new QueueChangerThread(dq, 1000)).start();
qe = dq.peek(true);
assertNotNull(qe);
dq.remove();
assertNotNull(dq.peek(true));
assertNotNull(dq.remove());
assertNull(dq.poll());
// timeout scenario ... background thread won't offer until long after the peek times out
QueueChangerThread qct = new QueueChangerThread(dq, 1000);
qct.start();
qe = dq.peek(500);
assertTrue(qe == null);
assertNull(dq.peek(500));
qct.join();
}
@Test
public void testDistributedQueueBlocking() throws Exception {
String dqZNode = "/distqueue/test";
String testData = "hello world";
DistributedQueue dq = makeDistributedQueue(dqZNode);
assertNull(dq.peek());
Future<String> future = executor.submit(() -> new String(dq.peek(true), UTF8));
try {
qct.interrupt();
} catch (Exception exc) {}
future.get(1000, TimeUnit.MILLISECONDS);
fail("TimeoutException expected");
} catch (TimeoutException expected) {
assertFalse(future.isDone());
}
// Ultimately trips the watcher, triggering child refresh
dq.offer(testData.getBytes(UTF8));
assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
assertNotNull(dq.poll());
// After draining the queue, a watcher should be set.
assertNull(dq.peek(100));
assertTrue(dq.hasWatcher());
forceSessionExpire();
// Session expiry should have fired the watcher.
Thread.sleep(100);
assertFalse(dq.hasWatcher());
// Rerun the earlier test make sure updates are still seen, post reconnection.
future = executor.submit(() -> new String(dq.peek(true), UTF8));
try {
future.get(1000, TimeUnit.MILLISECONDS);
fail("TimeoutException expected");
} catch (TimeoutException expected) {
assertFalse(future.isDone());
}
// Ultimately trips the watcher, triggering child refresh
dq.offer(testData.getBytes(UTF8));
assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
assertNotNull(dq.poll());
assertNull(dq.poll());
}
private void forceSessionExpire() throws InterruptedException, TimeoutException {
long sessionId = zkClient.getSolrZooKeeper().getSessionId();
zkServer.expire(sessionId);
zkClient.getConnectionManager().waitForDisconnected(10000);
zkClient.getConnectionManager().waitForConnected(10000);
for (int i = 0; i < 100; ++i) {
if (zkClient.isConnected()) {
break;
}
Thread.sleep(50);
}
assertTrue(zkClient.isConnected());
assertFalse(sessionId == zkClient.getSolrZooKeeper().getSessionId());
}
protected DistributedQueue makeDistributedQueue(String dqZNode) throws Exception {
return new DistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
}
private class QueueChangerThread extends Thread {
@ -99,7 +178,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
}
}
protected String setupDistributedQueueZNode(String znodePath) throws Exception {
protected String setupNewDistributedQueueZNode(String znodePath) throws Exception {
if (!zkClient.exists("/", true))
zkClient.makePath("/", false, true);
if (zkClient.exists(znodePath, true))
@ -113,8 +192,10 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
public void tearDown() throws Exception {
try {
super.tearDown();
} catch (Exception exc) {}
} catch (Exception exc) {
}
closeZk();
executor.shutdown();
}
protected void setupZk() throws Exception {

View File

@ -20,7 +20,7 @@ package org.apache.solr.cloud;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.SolrZkClient;
@ -79,7 +79,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
private static final String COLLECTION_NAME = "mycollection";
private static final String CONFIG_NAME = "myconfig";
private static DistributedQueue workQueueMock;
private static OverseerCollectionQueue workQueueMock;
private static DistributedMap runningMapMock;
private static DistributedMap completedMapMock;
private static DistributedMap failureMapMock;
@ -105,7 +105,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
String myId, ShardHandlerFactory shardHandlerFactory,
String adminPath,
DistributedQueue workQueue, DistributedMap runningMap,
OverseerCollectionQueue workQueue, DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {
super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), null, new OverseerNodePrioritizer(zkStateReader, adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap);
@ -120,7 +120,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
@BeforeClass
public static void setUpOnce() throws Exception {
workQueueMock = createMock(DistributedQueue.class);
workQueueMock = createMock(OverseerCollectionQueue.class);
runningMapMock = createMock(DistributedMap.class);
completedMapMock = createMock(DistributedMap.class);
failureMapMock = createMock(DistributedMap.class);

View File

@ -0,0 +1,28 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
public class OverseerCollectionQueueTest extends DistributedQueueTest {
// TODO: OverseerCollectionQueue specific tests.
@Override
protected OverseerCollectionQueue makeDistributedQueue(String dqZNode) throws Exception {
return new OverseerCollectionQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
}
}