mirror of https://github.com/apache/lucene.git
SOLR-3647: DistrubtedQueue should use our Solr zk client rather than the std zk client.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1363781 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
59b658251e
commit
b2796c1d82
|
@ -23,12 +23,12 @@ import java.util.NoSuchElementException;
|
|||
import java.util.TreeMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooDefs;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.data.ACL;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -42,12 +42,12 @@ public class DistributedQueue {
|
|||
|
||||
private final String dir;
|
||||
|
||||
private ZooKeeper zookeeper;
|
||||
private SolrZkClient zookeeper;
|
||||
private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
|
||||
|
||||
private final String prefix = "qn-";
|
||||
|
||||
public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl) {
|
||||
public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl) {
|
||||
this.dir = dir;
|
||||
|
||||
if (acl != null) {
|
||||
|
@ -70,7 +70,7 @@ public class DistributedQueue {
|
|||
|
||||
List<String> childNames = null;
|
||||
try {
|
||||
childNames = zookeeper.getChildren(dir, watcher);
|
||||
childNames = zookeeper.getChildren(dir, watcher, true);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
throw e;
|
||||
}
|
||||
|
@ -124,7 +124,7 @@ public class DistributedQueue {
|
|||
for (String headNode : orderedChildren.values()) {
|
||||
if (headNode != null) {
|
||||
try {
|
||||
return zookeeper.getData(dir + "/" + headNode, false, null);
|
||||
return zookeeper.getData(dir + "/" + headNode, null, null, true);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
// Another client removed the node first, try next
|
||||
}
|
||||
|
@ -156,8 +156,8 @@ public class DistributedQueue {
|
|||
for (String headNode : orderedChildren.values()) {
|
||||
String path = dir + "/" + headNode;
|
||||
try {
|
||||
byte[] data = zookeeper.getData(path, false, null);
|
||||
zookeeper.delete(path, -1);
|
||||
byte[] data = zookeeper.getData(path, null, null, true);
|
||||
zookeeper.delete(path, -1, true);
|
||||
return data;
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
// Another client deleted the node first.
|
||||
|
@ -202,7 +202,7 @@ public class DistributedQueue {
|
|||
try {
|
||||
orderedChildren = orderedChildren(childWatcher);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
|
||||
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
|
||||
continue;
|
||||
}
|
||||
if (orderedChildren.size() == 0) {
|
||||
|
@ -213,8 +213,8 @@ public class DistributedQueue {
|
|||
for (String headNode : orderedChildren.values()) {
|
||||
String path = dir + "/" + headNode;
|
||||
try {
|
||||
byte[] data = zookeeper.getData(path, false, null);
|
||||
zookeeper.delete(path, -1);
|
||||
byte[] data = zookeeper.getData(path, null, null, true);
|
||||
zookeeper.delete(path, -1, true);
|
||||
return data;
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
// Another client deleted the node first.
|
||||
|
@ -234,11 +234,11 @@ public class DistributedQueue {
|
|||
for (;;) {
|
||||
try {
|
||||
zookeeper.create(dir + "/" + prefix, data, acl,
|
||||
CreateMode.PERSISTENT_SEQUENTIAL);
|
||||
CreateMode.PERSISTENT_SEQUENTIAL, true);
|
||||
return true;
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
try {
|
||||
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
|
||||
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
|
||||
} catch (KeeperException.NodeExistsException ne) {
|
||||
//someone created it
|
||||
}
|
||||
|
@ -284,7 +284,7 @@ public class DistributedQueue {
|
|||
try {
|
||||
orderedChildren = orderedChildren(childWatcher);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
|
||||
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
|
||||
continue;
|
||||
}
|
||||
if (orderedChildren.size() == 0) {
|
||||
|
@ -295,7 +295,7 @@ public class DistributedQueue {
|
|||
for (String headNode : orderedChildren.values()) {
|
||||
String path = dir + "/" + headNode;
|
||||
try {
|
||||
byte[] data = zookeeper.getData(path, false, null);
|
||||
byte[] data = zookeeper.getData(path, null, null, true);
|
||||
return data;
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
// Another client deleted the node first.
|
||||
|
|
|
@ -422,19 +422,19 @@ public class Overseer {
|
|||
*/
|
||||
public static DistributedQueue getInQueue(final SolrZkClient zkClient) {
|
||||
createOverseerNode(zkClient);
|
||||
return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/queue", null);
|
||||
return new DistributedQueue(zkClient, "/overseer/queue", null);
|
||||
}
|
||||
|
||||
/* Internal queue, not to be used outside of Overseer */
|
||||
static DistributedQueue getInternalQueue(final SolrZkClient zkClient) {
|
||||
createOverseerNode(zkClient);
|
||||
return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/queue-work", null);
|
||||
return new DistributedQueue(zkClient, "/overseer/queue-work", null);
|
||||
}
|
||||
|
||||
/* Collection creation queue */
|
||||
static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
|
||||
createOverseerNode(zkClient);
|
||||
return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/collection-queue-work", null);
|
||||
return new DistributedQueue(zkClient, "/overseer/collection-queue-work", null);
|
||||
}
|
||||
|
||||
private static void createOverseerNode(final SolrZkClient zkClient) {
|
||||
|
|
Loading…
Reference in New Issue