SOLR-11368: Abstract out DistibutedQueue as an interface

This commit is contained in:
Noble Paul 2017-09-20 15:43:45 +09:30
parent a2cd9c12d5
commit 6ba04e3c1f
15 changed files with 68 additions and 54 deletions

View File

@ -26,6 +26,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.solr.common.cloud.DistributedQueue;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;

View File

@ -80,10 +80,10 @@ public class Overseer implements Closeable {
private final SolrZkClient zkClient;
private final String myId;
//queue where everybody can throw tasks
private final DistributedQueue stateUpdateQueue;
private final ZkDistributedQueue stateUpdateQueue;
//Internal queue where overseer stores events that have not yet been published into cloudstate
//If Overseer dies while extracting the main queue a new overseer will start from this queue
private final DistributedQueue workQueue;
private final ZkDistributedQueue workQueue;
// Internal map which holds the information about running tasks.
private final DistributedMap runningMap;
// Internal map which holds the information about successfully completed tasks.
@ -612,9 +612,9 @@ public class Overseer implements Closeable {
* This method will create the /overseer znode in ZooKeeper if it does not exist already.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
public static DistributedQueue getStateUpdateQueue(final SolrZkClient zkClient) {
public static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient) {
return getStateUpdateQueue(zkClient, new Stats());
}
@ -625,11 +625,11 @@ public class Overseer implements Closeable {
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @param zkStats a {@link Overseer.Stats} object which tracks statistics for all zookeeper operations performed by this queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static DistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) {
static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
return new DistributedQueue(zkClient, "/overseer/queue", zkStats);
return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats);
}
/**
@ -645,11 +645,11 @@ public class Overseer implements Closeable {
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @param zkStats a {@link Overseer.Stats} object which tracks statistics for all zookeeper operations performed by this queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static DistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) {
static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
return new DistributedQueue(zkClient, "/overseer/queue-work", zkStats);
return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats);
}
/* Internal map for failed tasks, not to be used outside of the Overseer */
@ -683,7 +683,7 @@ public class Overseer implements Closeable {
* see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) {
return getCollectionQueue(zkClient, new Stats());
@ -701,7 +701,7 @@ public class Overseer implements Closeable {
* see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
@ -721,7 +721,7 @@ public class Overseer implements Closeable {
* see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient) {
return getConfigSetQueue(zkClient, new Stats());
@ -744,7 +744,7 @@ public class Overseer implements Closeable {
* {@link OverseerConfigSetMessageHandler}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) {
// For now, we use the same queue as the collection queue, but ensure

View File

@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.cloud.DistributedQueue;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@ -320,7 +321,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
throws KeeperException, InterruptedException {
throws Exception {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
SolrZkClient zkClient = zkStateReader.getZkClient();
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
@ -331,7 +332,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
inQueue.offer(Utils.toJSON(m));
}
private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) || StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP +
@ -440,7 +441,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
}
void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException {
void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception {
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
ZkStateReader.CORE_NAME_PROP, core,
@ -462,7 +463,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
//TODO should we not remove in the next release ?
private void migrateStateFormat(ClusterState state, ZkNodeProps message, NamedList results)
throws KeeperException, InterruptedException {
throws Exception {
final String collectionName = message.getStr(COLLECTION_PROP);
boolean firstLoop = true;
@ -633,7 +634,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
private void modifyCollection(ClusterState clusterState, ZkNodeProps message, NamedList results)
throws KeeperException, InterruptedException {
throws Exception {
final String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
//the rest of the processing is based on writing cluster state properties

View File

@ -35,11 +35,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link DistributedQueue} augmented with helper methods specific to the overseer task queues.
* A {@link ZkDistributedQueue} augmented with helper methods specific to the overseer task queues.
* Methods specific to this subclass ignore superclass internal state and hit ZK directly.
* This is inefficient! But the API on this class is kind of muddy..
*/
public class OverseerTaskQueue extends DistributedQueue {
public class OverseerTaskQueue extends ZkDistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String RESPONSE_PREFIX = "qnr-" ;

View File

@ -33,6 +33,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.solr.common.cloud.DistributedQueue;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.SolrException;

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.common.cloud.DistributedQueue;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.overseer.OverseerAction;

View File

@ -136,7 +136,7 @@ public class ZkController {
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
private final DistributedQueue overseerJobQueue;
private final ZkDistributedQueue overseerJobQueue;
private final OverseerTaskQueue overseerCollectionQueue;
private final OverseerTaskQueue overseerConfigSetQueue;
@ -1738,7 +1738,7 @@ public class ZkController {
}
}
public DistributedQueue getOverseerJobQueue() {
public ZkDistributedQueue getOverseerJobQueue() {
return overseerJobQueue;
}

View File

@ -30,6 +30,7 @@ import java.util.function.Predicate;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.solr.common.cloud.DistributedQueue;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
@ -47,7 +48,7 @@ import org.slf4j.LoggerFactory;
* multiple-producer: if there are multiple consumers on the same ZK queue,
* the results should be correct but inefficient
*/
public class DistributedQueue {
public class ZkDistributedQueue implements DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final String PREFIX = "qn-";
@ -92,11 +93,11 @@ public class DistributedQueue {
private int watcherCount = 0;
public DistributedQueue(SolrZkClient zookeeper, String dir) {
public ZkDistributedQueue(SolrZkClient zookeeper, String dir) {
this(zookeeper, dir, new Overseer.Stats());
}
public DistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) {
public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) {
this.dir = dir;
ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
@ -119,6 +120,7 @@ public class DistributedQueue {
*
* @return data at the first element of the queue, or null.
*/
@Override
public byte[] peek() throws KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_peek");
try {
@ -135,6 +137,7 @@ public class DistributedQueue {
* @param block if true, blocks until an element enters the queue
* @return data at the first element of the queue, or null.
*/
@Override
public byte[] peek(boolean block) throws KeeperException, InterruptedException {
return block ? peek(Long.MAX_VALUE) : peek();
}
@ -146,6 +149,7 @@ public class DistributedQueue {
* @param wait max wait time in ms.
* @return data at the first element of the queue, or null.
*/
@Override
public byte[] peek(long wait) throws KeeperException, InterruptedException {
Preconditions.checkArgument(wait > 0);
Timer.Context time;
@ -177,6 +181,7 @@ public class DistributedQueue {
*
* @return Head of the queue or null.
*/
@Override
public byte[] poll() throws KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_poll");
try {
@ -191,6 +196,7 @@ public class DistributedQueue {
*
* @return The former head of the queue
*/
@Override
public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_remove");
try {
@ -209,6 +215,7 @@ public class DistributedQueue {
*
* @return The former head of the queue
*/
@Override
public byte[] take() throws KeeperException, InterruptedException {
// Same as for element. Should refactor this.
Timer.Context timer = stats.time(dir + "_take");
@ -231,6 +238,7 @@ public class DistributedQueue {
* Inserts data into queue. If there are no other queue consumers, the offered element
* will be immediately visible when this method returns.
*/
@Override
public void offer(byte[] data) throws KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_offer");
try {
@ -323,10 +331,10 @@ public class DistributedQueue {
/**
* Return the currently-known set of elements, using child names from memory. If no children are found, or no
* children pass {@code acceptFilter}, waits up to {@code waitMillis} for at least one child to become available.
* <p/>
* Package-private to support {@link OverseerTaskQueue} specifically.
*/
Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
@Override
public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
List<String> foundChildren = new ArrayList<>();
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
boolean first = true;

View File

@ -41,12 +41,12 @@ import org.apache.lucene.util.CharsRefBuilder;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.SolrRequest.METHOD;
import org.apache.solr.common.cloud.DistributedQueue;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.SimpleSolrResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.overseer.OverseerAction;

View File

@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.cloud.DistributedQueue;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreStatus;
@ -33,7 +34,6 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.FileUtils;
import org.apache.zookeeper.KeeperException;
import org.junit.BeforeClass;
import org.junit.Test;
@ -83,7 +83,7 @@ public class DeleteShardTest extends SolrCloudTestCase {
}
protected void setSliceState(String collection, String slice, State state) throws SolrServerException, IOException,
KeeperException, InterruptedException {
Exception {
CloudSolrClient client = cluster.getSolrClient();

View File

@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.DistributedQueue;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
@ -95,7 +96,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
String dqZNode = "/distqueue/test";
byte[] data = "hello world".getBytes(UTF8);
DistributedQueue consumer = makeDistributedQueue(dqZNode);
ZkDistributedQueue consumer = makeDistributedQueue(dqZNode);
DistributedQueue producer = makeDistributedQueue(dqZNode);
DistributedQueue producer2 = makeDistributedQueue(dqZNode);
@ -124,7 +125,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
String dqZNode = "/distqueue/test";
String testData = "hello world";
DistributedQueue dq = makeDistributedQueue(dqZNode);
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
assertNull(dq.peek());
Future<String> future = executor.submit(() -> new String(dq.peek(true), UTF8));
@ -171,7 +172,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
@Test
public void testLeakChildWatcher() throws Exception {
String dqZNode = "/distqueue/test";
DistributedQueue dq = makeDistributedQueue(dqZNode);
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
@ -207,7 +208,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
@Test
public void testLocallyOffer() throws Exception {
String dqZNode = "/distqueue/test";
DistributedQueue dq = makeDistributedQueue(dqZNode);
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
dq.peekElements(1, 1, s -> true);
for (int i = 0; i < 100; i++) {
byte[] data = String.valueOf(i).getBytes(UTF8);
@ -224,7 +225,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
String dqZNode = "/distqueue/test";
byte[] data = "hello world".getBytes(UTF8);
DistributedQueue dq = makeDistributedQueue(dqZNode);
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
// Populate with data.
dq.offer(data);
@ -280,8 +281,8 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
assertFalse(sessionId == zkClient.getSolrZooKeeper().getSessionId());
}
protected DistributedQueue makeDistributedQueue(String dqZNode) throws Exception {
return new DistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
protected ZkDistributedQueue makeDistributedQueue(String dqZNode) throws Exception {
return new ZkDistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
}
private static class QueueChangerThread extends Thread {

View File

@ -204,7 +204,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
protected void unsetLeader(String collection, String slice) throws Exception {
DistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
ZkDistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
@ -232,7 +232,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
protected void setReplicaState(String collection, String slice, Replica replica, Replica.State state) throws SolrServerException, IOException,
KeeperException, InterruptedException {
DistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
ZkDistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
String baseUrl = zkStateReader.getBaseUrlForNodeName(replica.getNodeName());

View File

@ -23,6 +23,7 @@ import java.util.Random;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.cloud.DistributedQueue;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
@ -68,7 +69,7 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
private void testFillWorkQueue() throws Exception {
try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(),
DistributedQueue distributedQueue = new ZkDistributedQueue(cloudClient.getZkStateReader().getZkClient(),
"/overseer/collection-queue-work", new Overseer.Stats());
//fill the work queue with blocked tasks by adding more than the no:of parallel tasks
for (int i = 0; i < MAX_PARALLEL_TASKS+5; i++) {
@ -149,7 +150,7 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
private void testTaskExclusivity() throws Exception, SolrServerException {
DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(),
DistributedQueue distributedQueue = new ZkDistributedQueue(cloudClient.getZkStateReader().getZkClient(),
"/overseer/collection-queue-work", new Overseer.Stats());
try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {

View File

@ -136,7 +136,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.NUM_SHARDS_PROP, numShards+"",
"createNodeSet", "");
DistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
q.offer(Utils.toJSON(m));
}
@ -153,7 +153,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName,
ZkStateReader.COLLECTION_PROP, collection);
DistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
q.offer(Utils.toJSON(m));
return null;
} else {
@ -166,7 +166,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.SHARD_ID_PROP, shard,
ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
ZkStateReader.BASE_URL_PROP, "http://" + nodeName + "/solr/");
DistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
q.offer(Utils.toJSON(m));
}
@ -287,7 +287,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.NUM_SHARDS_PROP, "3",
"createNodeSet", "");
DistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
q.offer(Utils.toJSON(m));
for (int i = 0; i < numShards; i++) {
@ -426,7 +426,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
overseerClient = electNewOverseer(server.getZkAddress());
DistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
"name", COLLECTION,
@ -844,7 +844,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.MAX_SHARDS_PER_NODE, "1"
);
DistributedQueue q = Overseer.getStateUpdateQueue(controllerClient);
ZkDistributedQueue q = Overseer.getStateUpdateQueue(controllerClient);
q.offer(Utils.toJSON(m));
controllerClient.makePath("/collections/perf" + i, true);
}
@ -859,7 +859,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.NUM_SHARDS_PROP, "1",
ZkStateReader.BASE_URL_PROP, "http://" + "node1"
+ "/solr/");
DistributedQueue q = Overseer.getStateUpdateQueue(controllerClient);
ZkDistributedQueue q = Overseer.getStateUpdateQueue(controllerClient);
q.offer(Utils.toJSON(m));
if (j >= MAX_COLLECTIONS - 1) j = 0;
if (k >= MAX_CORES - 1) k = 0;
@ -876,7 +876,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.NUM_SHARDS_PROP, "1",
ZkStateReader.BASE_URL_PROP, "http://" + "node1"
+ "/solr/");
DistributedQueue q = Overseer.getStateUpdateQueue(controllerClient);
ZkDistributedQueue q = Overseer.getStateUpdateQueue(controllerClient);
q.offer(Utils.toJSON(m));
Timer t = new Timer();
@ -966,7 +966,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
//prepopulate work queue with some items to emulate previous overseer died before persisting state
DistributedQueue queue = Overseer.getInternalWorkQueue(zkClient, new Overseer.Stats());
ZkDistributedQueue queue = Overseer.getInternalWorkQueue(zkClient, new Overseer.Stats());
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
"name", COLLECTION,
@ -1048,7 +1048,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
overseerClient = electNewOverseer(server.getZkAddress());
DistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
@ -1209,7 +1209,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
overseerClient = electNewOverseer(server.getZkAddress());
DistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
// create collection
{

View File

@ -179,7 +179,7 @@ public class TestRandomRequestDistribution extends AbstractFullDistribZkTestBase
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
log.info("Forcing {} to go into 'down' state", notLeader.getStr(ZkStateReader.CORE_NAME_PROP));
DistributedQueue q = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
ZkDistributedQueue q = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
q.offer(Utils.toJSON(m));
verifyReplicaStatus(cloudClient.getZkStateReader(), "football", "shard1", notLeader.getName(), Replica.State.DOWN);