diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java index 43bd6bd3f66..fd868bc51cc 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.solr.common.cloud.DistributedQueue; import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 7dd85bc8446..45625dca77b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -80,10 +80,10 @@ public class Overseer implements Closeable { private final SolrZkClient zkClient; private final String myId; //queue where everybody can throw tasks - private final DistributedQueue stateUpdateQueue; + private final ZkDistributedQueue stateUpdateQueue; //Internal queue where overseer stores events that have not yet been published into cloudstate //If Overseer dies while extracting the main queue a new overseer will start from this queue - private final DistributedQueue workQueue; + private final ZkDistributedQueue workQueue; // Internal map which holds the information about running tasks. private final DistributedMap runningMap; // Internal map which holds the information about successfully completed tasks. @@ -612,9 +612,9 @@ public class Overseer implements Closeable { * This method will create the /overseer znode in ZooKeeper if it does not exist already. * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue - * @return a {@link DistributedQueue} object + * @return a {@link ZkDistributedQueue} object */ - public static DistributedQueue getStateUpdateQueue(final SolrZkClient zkClient) { + public static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient) { return getStateUpdateQueue(zkClient, new Stats()); } @@ -625,11 +625,11 @@ public class Overseer implements Closeable { * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue * @param zkStats a {@link Overseer.Stats} object which tracks statistics for all zookeeper operations performed by this queue - * @return a {@link DistributedQueue} object + * @return a {@link ZkDistributedQueue} object */ - static DistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) { + static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) { createOverseerNode(zkClient); - return new DistributedQueue(zkClient, "/overseer/queue", zkStats); + return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats); } /** @@ -645,11 +645,11 @@ public class Overseer implements Closeable { * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue * @param zkStats a {@link Overseer.Stats} object which tracks statistics for all zookeeper operations performed by this queue - * @return a {@link DistributedQueue} object + * @return a {@link ZkDistributedQueue} object */ - static DistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) { + static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) { createOverseerNode(zkClient); - return new DistributedQueue(zkClient, "/overseer/queue-work", zkStats); + return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats); } /* Internal map for failed tasks, not to be used outside of the Overseer */ @@ -683,7 +683,7 @@ public class Overseer implements Closeable { * see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}. * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue - * @return a {@link DistributedQueue} object + * @return a {@link ZkDistributedQueue} object */ static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) { return getCollectionQueue(zkClient, new Stats()); @@ -701,7 +701,7 @@ public class Overseer implements Closeable { * see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}. * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue - * @return a {@link DistributedQueue} object + * @return a {@link ZkDistributedQueue} object */ static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) { createOverseerNode(zkClient); @@ -721,7 +721,7 @@ public class Overseer implements Closeable { * see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}. * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue - * @return a {@link DistributedQueue} object + * @return a {@link ZkDistributedQueue} object */ static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient) { return getConfigSetQueue(zkClient, new Stats()); @@ -744,7 +744,7 @@ public class Overseer implements Closeable { * {@link OverseerConfigSetMessageHandler}. * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue - * @return a {@link DistributedQueue} object + * @return a {@link ZkDistributedQueue} object */ static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) { // For now, we use the same queue as the collection queue, but ensure diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java index 095578f35b9..bd760f2f477 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java @@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.commons.lang.StringUtils; import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.cloud.DistributedQueue; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException; import org.apache.solr.client.solrj.request.AbstractUpdateRequest; @@ -320,7 +321,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler } private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results) - throws KeeperException, InterruptedException { + throws Exception { checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP); SolrZkClient zkClient = zkStateReader.getZkClient(); DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient); @@ -331,7 +332,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler inQueue.offer(Utils.toJSON(m)); } - private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException { + private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) || StringUtils.isBlank(message.getStr(PROPERTY_PROP))) { throw new SolrException(ErrorCode.BAD_REQUEST, "The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP + @@ -440,7 +441,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler } } - void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException { + void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception { ZkNodeProps m = new ZkNodeProps( Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, core, @@ -462,7 +463,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler //TODO should we not remove in the next release ? private void migrateStateFormat(ClusterState state, ZkNodeProps message, NamedList results) - throws KeeperException, InterruptedException { + throws Exception { final String collectionName = message.getStr(COLLECTION_PROP); boolean firstLoop = true; @@ -633,7 +634,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler private void modifyCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) - throws KeeperException, InterruptedException { + throws Exception { final String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP); //the rest of the processing is based on writing cluster state properties diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java index 92e34cfe498..b381c3b1c7f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java @@ -35,11 +35,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A {@link DistributedQueue} augmented with helper methods specific to the overseer task queues. + * A {@link ZkDistributedQueue} augmented with helper methods specific to the overseer task queues. * Methods specific to this subclass ignore superclass internal state and hit ZK directly. * This is inefficient! But the API on this class is kind of muddy.. */ -public class OverseerTaskQueue extends DistributedQueue { +public class OverseerTaskQueue extends ZkDistributedQueue { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final String RESPONSE_PREFIX = "qnr-" ; diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java index fa493c7cf90..49720c0906f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java @@ -33,6 +33,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import org.apache.solr.common.cloud.DistributedQueue; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.SolrException; diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java index 8c69f54af0a..71ed86434c9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.solr.common.cloud.DistributedQueue; import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd; import org.apache.solr.cloud.overseer.OverseerAction; diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index c79390c0c5c..b3793af6e17 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -136,7 +136,7 @@ public class ZkController { private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery"); - private final DistributedQueue overseerJobQueue; + private final ZkDistributedQueue overseerJobQueue; private final OverseerTaskQueue overseerCollectionQueue; private final OverseerTaskQueue overseerConfigSetQueue; @@ -1738,7 +1738,7 @@ public class ZkController { } } - public DistributedQueue getOverseerJobQueue() { + public ZkDistributedQueue getOverseerJobQueue() { return overseerJobQueue; } diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java similarity index 96% rename from solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java rename to solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java index cfd31445b63..5b4472aaf94 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java @@ -30,6 +30,7 @@ import java.util.function.Predicate; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.solr.common.cloud.DistributedQueue; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.SolrZkClient; @@ -47,7 +48,7 @@ import org.slf4j.LoggerFactory; * multiple-producer: if there are multiple consumers on the same ZK queue, * the results should be correct but inefficient */ -public class DistributedQueue { +public class ZkDistributedQueue implements DistributedQueue { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); static final String PREFIX = "qn-"; @@ -92,11 +93,11 @@ public class DistributedQueue { private int watcherCount = 0; - public DistributedQueue(SolrZkClient zookeeper, String dir) { + public ZkDistributedQueue(SolrZkClient zookeeper, String dir) { this(zookeeper, dir, new Overseer.Stats()); } - public DistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) { + public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) { this.dir = dir; ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout()); @@ -119,6 +120,7 @@ public class DistributedQueue { * * @return data at the first element of the queue, or null. */ + @Override public byte[] peek() throws KeeperException, InterruptedException { Timer.Context time = stats.time(dir + "_peek"); try { @@ -135,6 +137,7 @@ public class DistributedQueue { * @param block if true, blocks until an element enters the queue * @return data at the first element of the queue, or null. */ + @Override public byte[] peek(boolean block) throws KeeperException, InterruptedException { return block ? peek(Long.MAX_VALUE) : peek(); } @@ -146,6 +149,7 @@ public class DistributedQueue { * @param wait max wait time in ms. * @return data at the first element of the queue, or null. */ + @Override public byte[] peek(long wait) throws KeeperException, InterruptedException { Preconditions.checkArgument(wait > 0); Timer.Context time; @@ -177,6 +181,7 @@ public class DistributedQueue { * * @return Head of the queue or null. */ + @Override public byte[] poll() throws KeeperException, InterruptedException { Timer.Context time = stats.time(dir + "_poll"); try { @@ -191,6 +196,7 @@ public class DistributedQueue { * * @return The former head of the queue */ + @Override public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException { Timer.Context time = stats.time(dir + "_remove"); try { @@ -209,6 +215,7 @@ public class DistributedQueue { * * @return The former head of the queue */ + @Override public byte[] take() throws KeeperException, InterruptedException { // Same as for element. Should refactor this. Timer.Context timer = stats.time(dir + "_take"); @@ -231,6 +238,7 @@ public class DistributedQueue { * Inserts data into queue. If there are no other queue consumers, the offered element * will be immediately visible when this method returns. */ + @Override public void offer(byte[] data) throws KeeperException, InterruptedException { Timer.Context time = stats.time(dir + "_offer"); try { @@ -323,10 +331,10 @@ public class DistributedQueue { /** * Return the currently-known set of elements, using child names from memory. If no children are found, or no * children pass {@code acceptFilter}, waits up to {@code waitMillis} for at least one child to become available. - *

* Package-private to support {@link OverseerTaskQueue} specifically. */ - Collection> peekElements(int max, long waitMillis, Predicate acceptFilter) throws KeeperException, InterruptedException { + @Override + public Collection> peekElements(int max, long waitMillis, Predicate acceptFilter) throws KeeperException, InterruptedException { List foundChildren = new ArrayList<>(); long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis); boolean first = true; diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 45f6ea2ebca..ad23db12c84 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -41,12 +41,12 @@ import org.apache.lucene.util.CharsRefBuilder; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrRequest.METHOD; +import org.apache.solr.common.cloud.DistributedQueue; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.GenericSolrRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.SimpleSolrResponse; import org.apache.solr.cloud.CloudDescriptor; -import org.apache.solr.cloud.DistributedQueue; import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.overseer.OverseerAction; diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java index ed3d03b3894..f6afe3aea42 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.cloud.DistributedQueue; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CoreStatus; @@ -33,7 +34,6 @@ import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.Utils; import org.apache.solr.util.FileUtils; -import org.apache.zookeeper.KeeperException; import org.junit.BeforeClass; import org.junit.Test; @@ -83,7 +83,7 @@ public class DeleteShardTest extends SolrCloudTestCase { } protected void setSliceState(String collection, String slice, State state) throws SolrServerException, IOException, - KeeperException, InterruptedException { + Exception { CloudSolrClient client = cluster.getSolrClient(); diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java index ed33dc1bce0..d91911c2663 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.common.cloud.DistributedQueue; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.SolrjNamedThreadFactory; @@ -95,7 +96,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 { String dqZNode = "/distqueue/test"; byte[] data = "hello world".getBytes(UTF8); - DistributedQueue consumer = makeDistributedQueue(dqZNode); + ZkDistributedQueue consumer = makeDistributedQueue(dqZNode); DistributedQueue producer = makeDistributedQueue(dqZNode); DistributedQueue producer2 = makeDistributedQueue(dqZNode); @@ -124,7 +125,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 { String dqZNode = "/distqueue/test"; String testData = "hello world"; - DistributedQueue dq = makeDistributedQueue(dqZNode); + ZkDistributedQueue dq = makeDistributedQueue(dqZNode); assertNull(dq.peek()); Future future = executor.submit(() -> new String(dq.peek(true), UTF8)); @@ -171,7 +172,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 { @Test public void testLeakChildWatcher() throws Exception { String dqZNode = "/distqueue/test"; - DistributedQueue dq = makeDistributedQueue(dqZNode); + ZkDistributedQueue dq = makeDistributedQueue(dqZNode); assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty()); assertEquals(1, dq.watcherCount()); assertFalse(dq.isDirty()); @@ -207,7 +208,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 { @Test public void testLocallyOffer() throws Exception { String dqZNode = "/distqueue/test"; - DistributedQueue dq = makeDistributedQueue(dqZNode); + ZkDistributedQueue dq = makeDistributedQueue(dqZNode); dq.peekElements(1, 1, s -> true); for (int i = 0; i < 100; i++) { byte[] data = String.valueOf(i).getBytes(UTF8); @@ -224,7 +225,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 { String dqZNode = "/distqueue/test"; byte[] data = "hello world".getBytes(UTF8); - DistributedQueue dq = makeDistributedQueue(dqZNode); + ZkDistributedQueue dq = makeDistributedQueue(dqZNode); // Populate with data. dq.offer(data); @@ -280,8 +281,8 @@ public class DistributedQueueTest extends SolrTestCaseJ4 { assertFalse(sessionId == zkClient.getSolrZooKeeper().getSessionId()); } - protected DistributedQueue makeDistributedQueue(String dqZNode) throws Exception { - return new DistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode)); + protected ZkDistributedQueue makeDistributedQueue(String dqZNode) throws Exception { + return new ZkDistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode)); } private static class QueueChangerThread extends Thread { diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java index 749abdf804d..fdaeeb763ae 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java @@ -204,7 +204,7 @@ public class ForceLeaderTest extends HttpPartitionTest { } protected void unsetLeader(String collection, String slice) throws Exception { - DistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()); + ZkDistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()); ZkStateReader zkStateReader = cloudClient.getZkStateReader(); ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(), @@ -232,7 +232,7 @@ public class ForceLeaderTest extends HttpPartitionTest { protected void setReplicaState(String collection, String slice, Replica replica, Replica.State state) throws SolrServerException, IOException, KeeperException, InterruptedException { - DistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()); + ZkDistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()); ZkStateReader zkStateReader = cloudClient.getZkStateReader(); String baseUrl = zkStateReader.getBaseUrlForNodeName(replica.getNodeName()); diff --git a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java index 537dbba86e2..230e172d879 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java @@ -23,6 +23,7 @@ import java.util.Random; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.cloud.DistributedQueue; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create; @@ -68,7 +69,7 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase { private void testFillWorkQueue() throws Exception { try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) { - DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(), + DistributedQueue distributedQueue = new ZkDistributedQueue(cloudClient.getZkStateReader().getZkClient(), "/overseer/collection-queue-work", new Overseer.Stats()); //fill the work queue with blocked tasks by adding more than the no:of parallel tasks for (int i = 0; i < MAX_PARALLEL_TASKS+5; i++) { @@ -149,7 +150,7 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase { private void testTaskExclusivity() throws Exception, SolrServerException { - DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(), + DistributedQueue distributedQueue = new ZkDistributedQueue(cloudClient.getZkStateReader().getZkClient(), "/overseer/collection-queue-work", new Overseer.Stats()); try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) { diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index b921e39a242..507c18834ce 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -136,7 +136,7 @@ public class OverseerTest extends SolrTestCaseJ4 { ZkStateReader.REPLICATION_FACTOR, "1", ZkStateReader.NUM_SHARDS_PROP, numShards+"", "createNodeSet", ""); - DistributedQueue q = Overseer.getStateUpdateQueue(zkClient); + ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient); q.offer(Utils.toJSON(m)); } @@ -153,7 +153,7 @@ public class OverseerTest extends SolrTestCaseJ4 { ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName, ZkStateReader.COLLECTION_PROP, collection); - DistributedQueue q = Overseer.getStateUpdateQueue(zkClient); + ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient); q.offer(Utils.toJSON(m)); return null; } else { @@ -166,7 +166,7 @@ public class OverseerTest extends SolrTestCaseJ4 { ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards), ZkStateReader.BASE_URL_PROP, "http://" + nodeName + "/solr/"); - DistributedQueue q = Overseer.getStateUpdateQueue(zkClient); + ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient); q.offer(Utils.toJSON(m)); } @@ -287,7 +287,7 @@ public class OverseerTest extends SolrTestCaseJ4 { ZkStateReader.REPLICATION_FACTOR, "1", ZkStateReader.NUM_SHARDS_PROP, "3", "createNodeSet", ""); - DistributedQueue q = Overseer.getStateUpdateQueue(zkClient); + ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient); q.offer(Utils.toJSON(m)); for (int i = 0; i < numShards; i++) { @@ -426,7 +426,7 @@ public class OverseerTest extends SolrTestCaseJ4 { overseerClient = electNewOverseer(server.getZkAddress()); - DistributedQueue q = Overseer.getStateUpdateQueue(zkClient); + ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient); ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), "name", COLLECTION, @@ -844,7 +844,7 @@ public class OverseerTest extends SolrTestCaseJ4 { ZkStateReader.REPLICATION_FACTOR, "1", ZkStateReader.MAX_SHARDS_PER_NODE, "1" ); - DistributedQueue q = Overseer.getStateUpdateQueue(controllerClient); + ZkDistributedQueue q = Overseer.getStateUpdateQueue(controllerClient); q.offer(Utils.toJSON(m)); controllerClient.makePath("/collections/perf" + i, true); } @@ -859,7 +859,7 @@ public class OverseerTest extends SolrTestCaseJ4 { ZkStateReader.NUM_SHARDS_PROP, "1", ZkStateReader.BASE_URL_PROP, "http://" + "node1" + "/solr/"); - DistributedQueue q = Overseer.getStateUpdateQueue(controllerClient); + ZkDistributedQueue q = Overseer.getStateUpdateQueue(controllerClient); q.offer(Utils.toJSON(m)); if (j >= MAX_COLLECTIONS - 1) j = 0; if (k >= MAX_CORES - 1) k = 0; @@ -876,7 +876,7 @@ public class OverseerTest extends SolrTestCaseJ4 { ZkStateReader.NUM_SHARDS_PROP, "1", ZkStateReader.BASE_URL_PROP, "http://" + "node1" + "/solr/"); - DistributedQueue q = Overseer.getStateUpdateQueue(controllerClient); + ZkDistributedQueue q = Overseer.getStateUpdateQueue(controllerClient); q.offer(Utils.toJSON(m)); Timer t = new Timer(); @@ -966,7 +966,7 @@ public class OverseerTest extends SolrTestCaseJ4 { reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); //prepopulate work queue with some items to emulate previous overseer died before persisting state - DistributedQueue queue = Overseer.getInternalWorkQueue(zkClient, new Overseer.Stats()); + ZkDistributedQueue queue = Overseer.getInternalWorkQueue(zkClient, new Overseer.Stats()); ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), "name", COLLECTION, @@ -1048,7 +1048,7 @@ public class OverseerTest extends SolrTestCaseJ4 { overseerClient = electNewOverseer(server.getZkAddress()); - DistributedQueue q = Overseer.getStateUpdateQueue(zkClient); + ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient); ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), @@ -1209,7 +1209,7 @@ public class OverseerTest extends SolrTestCaseJ4 { overseerClient = electNewOverseer(server.getZkAddress()); - DistributedQueue q = Overseer.getStateUpdateQueue(zkClient); + ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient); // create collection { diff --git a/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java b/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java index 415f80f563b..b53e457e7d1 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java @@ -179,7 +179,7 @@ public class TestRandomRequestDistribution extends AbstractFullDistribZkTestBase ZkStateReader.STATE_PROP, Replica.State.DOWN.toString()); log.info("Forcing {} to go into 'down' state", notLeader.getStr(ZkStateReader.CORE_NAME_PROP)); - DistributedQueue q = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()); + ZkDistributedQueue q = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()); q.offer(Utils.toJSON(m)); verifyReplicaStatus(cloudClient.getZkStateReader(), "football", "shard1", notLeader.getName(), Replica.State.DOWN);