From ce7db65e6f25e567aae6364e6f229d5a501a27da Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Thu, 24 Aug 2017 15:10:29 +0200 Subject: [PATCH] Revert "SOLR-11285: Initial refactoring." This reverts commit aee54ff7d15be96aec7e78dc30edfc78a4f166cf. --- .../org/apache/solr/cloud/AddReplicaCmd.java | 11 +- .../java/org/apache/solr/cloud/Assign.java | 29 ++-- .../org/apache/solr/cloud/CreateShardCmd.java | 10 +- .../org/apache/solr/cloud/DeleteShardCmd.java | 1 - ...ibutedQueue.java => DistributedQueue.java} | 19 +-- .../apache/solr/cloud/ElectionContext.java | 14 +- .../java/org/apache/solr/cloud/Overseer.java | 46 ++---- .../OverseerCollectionMessageHandler.java | 14 +- .../solr/cloud/OverseerNodePrioritizer.java | 2 +- .../apache/solr/cloud/OverseerTaskQueue.java | 4 +- .../apache/solr/cloud/RecoveryStrategy.java | 6 +- .../org/apache/solr/cloud/RestoreCmd.java | 2 +- .../org/apache/solr/cloud/SplitShardCmd.java | 1 - .../org/apache/solr/cloud/ZkController.java | 46 +----- .../solr/cloud/autoscaling/ActionContext.java | 11 +- .../AutoAddReplicasPlanAction.java | 17 +- .../solr/cloud/autoscaling/AutoScaling.java | 63 +++----- .../cloud/autoscaling/AutoScalingHandler.java | 5 +- .../cloud/autoscaling/ComputePlanAction.java | 57 ++++--- .../cloud/autoscaling/ExecutePlanAction.java | 14 +- .../autoscaling/HttpTriggerListener.java | 7 +- .../cloud/autoscaling/NodeAddedTrigger.java | 140 +++++++++++++++-- .../cloud/autoscaling/NodeLostTrigger.java | 135 ++++++++++++++-- .../autoscaling/OverseerTriggerThread.java | 54 ++++--- .../cloud/autoscaling/ScheduledTriggers.java | 46 +++--- .../cloud/autoscaling/SystemLogListener.java | 13 +- .../solr/cloud/autoscaling/TriggerBase.java | 147 ++---------------- .../cloud/autoscaling/TriggerEventQueue.java | 27 ++-- .../cloud/autoscaling/TriggerListener.java | 3 +- .../autoscaling/TriggerListenerBase.java | 8 +- .../ZkDistributedQueueFactory.java | 28 ---- .../org/apache/solr/core/CoreContainer.java | 4 - .../org/apache/solr/core/ZkContainer.java | 2 - .../processor/DistributedUpdateProcessor.java | 2 +- .../apache/solr/cloud/DeleteShardTest.java | 4 +- .../solr/cloud/DistributedQueueTest.java | 11 +- .../apache/solr/cloud/ForceLeaderTest.java | 4 +- .../solr/cloud/MultiThreadedOCPTest.java | 5 +- .../org/apache/solr/cloud/OverseerTest.java | 5 +- .../cloud/TestRandomRequestDistribution.java | 1 - .../AutoAddReplicasPlanActionTest.java | 2 +- .../autoscaling/ExecutePlanActionTest.java | 2 +- .../autoscaling/NodeAddedTriggerTest.java | 22 +-- .../autoscaling/NodeLostTriggerTest.java | 21 +-- .../cloud/autoscaling/TestPolicyCloud.java | 4 +- .../autoscaling/TriggerIntegrationTest.java | 5 +- .../client/solrj/cloud/DistributedQueue.java | 44 ------ .../autoscaling/ClusterDataProvider.java | 92 +---------- .../DelegatingClusterDataProvider.java | 118 -------------- .../solrj/cloud/autoscaling/Policy.java | 2 +- .../solrj/cloud/autoscaling/PolicyHelper.java | 17 +- .../client/solrj/cloud/autoscaling/Row.java | 2 +- .../solrj/impl/SolrClientDataProvider.java | 147 +----------------- .../solrj/cloud/autoscaling/TestPolicy.java | 33 ++-- 54 files changed, 553 insertions(+), 976 deletions(-) rename solr/core/src/java/org/apache/solr/cloud/{ZkDistributedQueue.java => DistributedQueue.java} (96%) delete mode 100644 solr/core/src/java/org/apache/solr/cloud/autoscaling/ZkDistributedQueueFactory.java delete mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java delete mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingClusterDataProvider.java diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java index 5b766b6fb67..6535d341d71 100644 --- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java @@ -18,7 +18,6 @@ package org.apache.solr.cloud; -import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Collection; import java.util.Collections; @@ -72,7 +71,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { } ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) - throws IOException, InterruptedException { + throws KeeperException, InterruptedException { log.debug("addReplica() : {}", Utils.toJSONString(message)); String collection = message.getStr(COLLECTION_PROP); String node = message.getStr(CoreAdminParams.NODE); @@ -121,7 +120,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { } } else { node = getNodesForNewReplicas(clusterState, collection, shard, 1, node, - ocmh.overseer.getClusterDataProvider(), ocmh.overseer.getCoreContainer()).get(0).nodeName;// TODO: use replica type in this logic too + ocmh.overseer.getZkController().getCoreContainer()).get(0).nodeName;// TODO: use replica type in this logic too } } log.info("Node Identified {} for creating new replica", node); @@ -160,11 +159,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { if (coreNodeName != null) { props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); } - try { - Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props)); - } catch (Exception e) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e); - } + Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props)); } params.set(CoreAdminParams.CORE_NODE_NAME, ocmh.waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName()); diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java index 329d88c1662..f22312f4758 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Assign.java +++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java @@ -31,7 +31,6 @@ import java.util.Set; import java.util.stream.Collectors; import com.google.common.collect.ImmutableMap; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.SolrClientDataProvider; @@ -244,10 +243,10 @@ public class Assign { List shardNames, int numNrtReplicas, int numTlogReplicas, - int numPullReplicas) throws IOException, InterruptedException { + int numPullReplicas) throws KeeperException, InterruptedException { List rulesMap = (List) message.get("rule"); String policyName = message.getStr(POLICY); - AutoScalingConfig autoScalingConfig = ocmh.overseer.getClusterDataProvider().getAutoScalingConfig(); + AutoScalingConfig autoScalingConfig = ocmh.zkStateReader.getAutoScalingConfig(); if (rulesMap == null && policyName == null && autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) { log.debug("Identify nodes using default"); @@ -296,7 +295,7 @@ public class Assign { PolicyHelper.SESSION_REF.set(ocmh.policySessionRef); try { return getPositionsUsingPolicy(collectionName, - shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.overseer.getClusterDataProvider(), nodeList); + shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.zkStateReader, nodeList); } finally { PolicyHelper.SESSION_REF.remove(); } @@ -325,7 +324,7 @@ public class Assign { // could be created on live nodes given maxShardsPerNode, Replication factor (if from createShard) etc. public static List getNodesForNewReplicas(ClusterState clusterState, String collectionName, String shard, int nrtReplicas, - Object createNodeSet, ClusterDataProvider cdp, CoreContainer cc) throws IOException, InterruptedException { + Object createNodeSet, CoreContainer cc) throws KeeperException, InterruptedException { log.debug("getNodesForNewReplicas() shard: {} , replicas : {} , createNodeSet {}", shard, nrtReplicas, createNodeSet ); DocCollection coll = clusterState.getCollection(collectionName); Integer maxShardsPerNode = coll.getMaxShardsPerNode(); @@ -357,14 +356,13 @@ public class Assign { List l = (List) coll.get(DocCollection.RULE); List replicaPositions = null; if (l != null) { - // TODO nocommit: make it so that this method doesn't require access to CC replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cc, coll, createNodeList, l); } String policyName = coll.getStr(POLICY); - AutoScalingConfig autoScalingConfig = cdp.getAutoScalingConfig(); + AutoScalingConfig autoScalingConfig = cc.getZkController().zkStateReader.getAutoScalingConfig(); if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) { replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, 0, 0, - policyName, cdp, createNodeList); + policyName, cc.getZkController().zkStateReader, createNodeList); } if(replicaPositions != null){ @@ -385,18 +383,21 @@ public class Assign { int nrtReplicas, int tlogReplicas, int pullReplicas, - String policyName, ClusterDataProvider cdp, - List nodesList) throws IOException, InterruptedException { + String policyName, ZkStateReader zkStateReader, + List nodesList) throws KeeperException, InterruptedException { log.debug("shardnames {} NRT {} TLOG {} PULL {} , policy {}, nodeList {}", shardNames, nrtReplicas, tlogReplicas, pullReplicas, policyName, nodesList); SolrClientDataProvider clientDataProvider = null; List replicaPositions = null; - AutoScalingConfig autoScalingConfig = cdp.getAutoScalingConfig(); - try { + AutoScalingConfig autoScalingConfig = zkStateReader.getAutoScalingConfig(); + try (CloudSolrClient csc = new CloudSolrClient.Builder() + .withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader)) + .build()) { + clientDataProvider = new SolrClientDataProvider(csc); Map kvMap = Collections.singletonMap(collName, policyName); replicaPositions = PolicyHelper.getReplicaLocations( collName, autoScalingConfig, - cdp, + clientDataProvider, kvMap, shardNames, nrtReplicas, @@ -404,7 +405,7 @@ public class Assign { pullReplicas, nodesList); return replicaPositions; - } catch (Exception e) { + } catch (IOException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error closing CloudSolrClient",e); } finally { if (log.isTraceEnabled()) { diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java index a3264453906..21258e4aac5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java @@ -17,7 +17,6 @@ package org.apache.solr.cloud; -import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collections; @@ -28,7 +27,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableMap; -import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; import org.apache.solr.client.solrj.cloud.autoscaling.Policy; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd; @@ -105,7 +103,7 @@ public class CreateShardCmd implements Cmd { numPullReplicas); } else { List sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, totalReplicas, - createNodeSetStr, ocmh.overseer.getClusterDataProvider(), ocmh.overseer.getCoreContainer()); + createNodeSetStr, ocmh.overseer.getZkController().getCoreContainer()); int i = 0; positions = new ArrayList<>(); for (Map.Entry e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas, @@ -175,8 +173,8 @@ public class CreateShardCmd implements Cmd { } static boolean usePolicyFramework(DocCollection collection, OverseerCollectionMessageHandler ocmh) - throws IOException, InterruptedException { - AutoScalingConfig autoScalingConfig = ocmh.overseer.getClusterDataProvider().getAutoScalingConfig(); - return !autoScalingConfig.isEmpty() || collection.getPolicyName() != null; + throws KeeperException, InterruptedException { + Map autoScalingJson = Utils.getJson(ocmh.zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true); + return autoScalingJson.get(Policy.CLUSTER_POLICY) != null || collection.getPolicyName() != null; } } diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java index f13fed5d9f1..43bd6bd3f66 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java similarity index 96% rename from solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java rename to solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java index ac3cde345f0..cfd31445b63 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java @@ -30,7 +30,6 @@ import java.util.function.Predicate; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.SolrZkClient; @@ -44,11 +43,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A ZK-based distributed queue. Optimized for single-consumer, + * A distributed queue. Optimized for single-consumer, * multiple-producer: if there are multiple consumers on the same ZK queue, * the results should be correct but inefficient */ -public class ZkDistributedQueue implements DistributedQueue { +public class DistributedQueue { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); static final String PREFIX = "qn-"; @@ -93,11 +92,11 @@ public class ZkDistributedQueue implements DistributedQueue { private int watcherCount = 0; - public ZkDistributedQueue(SolrZkClient zookeeper, String dir) { + public DistributedQueue(SolrZkClient zookeeper, String dir) { this(zookeeper, dir, new Overseer.Stats()); } - public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) { + public DistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) { this.dir = dir; ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout()); @@ -120,7 +119,6 @@ public class ZkDistributedQueue implements DistributedQueue { * * @return data at the first element of the queue, or null. */ - @Override public byte[] peek() throws KeeperException, InterruptedException { Timer.Context time = stats.time(dir + "_peek"); try { @@ -137,7 +135,6 @@ public class ZkDistributedQueue implements DistributedQueue { * @param block if true, blocks until an element enters the queue * @return data at the first element of the queue, or null. */ - @Override public byte[] peek(boolean block) throws KeeperException, InterruptedException { return block ? peek(Long.MAX_VALUE) : peek(); } @@ -149,7 +146,6 @@ public class ZkDistributedQueue implements DistributedQueue { * @param wait max wait time in ms. * @return data at the first element of the queue, or null. */ - @Override public byte[] peek(long wait) throws KeeperException, InterruptedException { Preconditions.checkArgument(wait > 0); Timer.Context time; @@ -181,7 +177,6 @@ public class ZkDistributedQueue implements DistributedQueue { * * @return Head of the queue or null. */ - @Override public byte[] poll() throws KeeperException, InterruptedException { Timer.Context time = stats.time(dir + "_poll"); try { @@ -196,7 +191,6 @@ public class ZkDistributedQueue implements DistributedQueue { * * @return The former head of the queue */ - @Override public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException { Timer.Context time = stats.time(dir + "_remove"); try { @@ -215,7 +209,6 @@ public class ZkDistributedQueue implements DistributedQueue { * * @return The former head of the queue */ - @Override public byte[] take() throws KeeperException, InterruptedException { // Same as for element. Should refactor this. Timer.Context timer = stats.time(dir + "_take"); @@ -238,7 +231,6 @@ public class ZkDistributedQueue implements DistributedQueue { * Inserts data into queue. If there are no other queue consumers, the offered element * will be immediately visible when this method returns. */ - @Override public void offer(byte[] data) throws KeeperException, InterruptedException { Timer.Context time = stats.time(dir + "_offer"); try { @@ -334,8 +326,7 @@ public class ZkDistributedQueue implements DistributedQueue { *

* Package-private to support {@link OverseerTaskQueue} specifically. */ - @Override - public Collection> peekElements(int max, long waitMillis, Predicate acceptFilter) throws KeeperException, InterruptedException { + Collection> peekElements(int max, long waitMillis, Predicate acceptFilter) throws KeeperException, InterruptedException { List foundChildren = new ArrayList<>(); long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis); boolean first = true; diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 56297afe560..491ae00def5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -223,11 +223,7 @@ class ShardLeaderElectionContextBase extends ElectionContext { ZkStateReader.CORE_NAME_PROP, leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString()); - try { - Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m)); - } catch (Exception e) { - throw new IOException("Overseer state update queue error", e); - } + Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m)); } public LeaderElector getLeaderElector() { @@ -316,11 +312,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { // clear the leader in clusterstate ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection); - try { - Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m)); - } catch (Exception e) { - throw new IOException("Overseer state update queue error", e); - } + Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m)); boolean allReplicasInLine = false; if (!weAreReplacement) { @@ -502,7 +494,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } } - public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception { + public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws KeeperException, InterruptedException { if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) { ZkStateReader zkStateReader = zkController.getZkStateReader(); zkStateReader.forceUpdateCollection(collection); diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 22fbd14c271..e2493b792f6 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -30,15 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger; import com.codahale.metrics.Timer; import org.apache.solr.client.solrj.SolrResponse; -import org.apache.solr.client.solrj.cloud.DistributedQueue; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; -import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.client.solrj.impl.SolrClientDataProvider; -import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider; import org.apache.solr.cloud.autoscaling.AutoScaling; import org.apache.solr.cloud.autoscaling.AutoScalingHandler; import org.apache.solr.cloud.autoscaling.OverseerTriggerThread; -import org.apache.solr.cloud.autoscaling.ZkDistributedQueueFactory; import org.apache.solr.cloud.overseer.ClusterStateMutator; import org.apache.solr.cloud.overseer.CollectionMutator; import org.apache.solr.cloud.overseer.NodeMutator; @@ -59,7 +53,6 @@ import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.ObjectReleaseTracker; import org.apache.solr.common.util.Utils; import org.apache.solr.core.CloudConfig; -import org.apache.solr.core.CoreContainer; import org.apache.solr.handler.admin.CollectionsHandler; import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.request.LocalSolrQueryRequest; @@ -95,10 +88,10 @@ public class Overseer implements Closeable { private final SolrZkClient zkClient; private final String myId; //queue where everybody can throw tasks - private final ZkDistributedQueue stateUpdateQueue; + private final DistributedQueue stateUpdateQueue; //Internal queue where overseer stores events that have not yet been published into cloudstate //If Overseer dies while extracting the main queue a new overseer will start from this queue - private final ZkDistributedQueue workQueue; + private final DistributedQueue workQueue; // Internal map which holds the information about running tasks. private final DistributedMap runningMap; // Internal map which holds the information about successfully completed tasks. @@ -545,8 +538,7 @@ public class Overseer implements Closeable { autoscalingTriggerCreator.start(); ThreadGroup triggerThreadGroup = new ThreadGroup("Overseer autoscaling triggers"); - OverseerTriggerThread trigger = new OverseerTriggerThread(zkController.getCoreContainer().getResourceLoader(), - zkController.getClusterDataProvider()); + OverseerTriggerThread trigger = new OverseerTriggerThread(zkController); triggerThread = new OverseerThread(triggerThreadGroup, trigger, "OverseerAutoScalingTriggerThread-" + id); updaterThread.start(); @@ -562,15 +554,7 @@ public class Overseer implements Closeable { ZkController getZkController(){ return zkController; } - - public CoreContainer getCoreContainer() { - return zkController.getCoreContainer(); - } - - public ClusterDataProvider getClusterDataProvider() { - return zkController.getClusterDataProvider(); - } - + /** * For tests. * @@ -695,7 +679,7 @@ public class Overseer implements Closeable { * This method will create the /overseer znode in ZooKeeper if it does not exist already. * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue - * @return a {@link ZkDistributedQueue} object + * @return a {@link DistributedQueue} object */ public static DistributedQueue getStateUpdateQueue(final SolrZkClient zkClient) { return getStateUpdateQueue(zkClient, new Stats()); @@ -708,11 +692,11 @@ public class Overseer implements Closeable { * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue * @param zkStats a {@link Overseer.Stats} object which tracks statistics for all zookeeper operations performed by this queue - * @return a {@link ZkDistributedQueue} object + * @return a {@link DistributedQueue} object */ - static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) { + static DistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) { createOverseerNode(zkClient); - return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats); + return new DistributedQueue(zkClient, "/overseer/queue", zkStats); } /** @@ -728,11 +712,11 @@ public class Overseer implements Closeable { * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue * @param zkStats a {@link Overseer.Stats} object which tracks statistics for all zookeeper operations performed by this queue - * @return a {@link ZkDistributedQueue} object + * @return a {@link DistributedQueue} object */ - static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) { + static DistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) { createOverseerNode(zkClient); - return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats); + return new DistributedQueue(zkClient, "/overseer/queue-work", zkStats); } /* Internal map for failed tasks, not to be used outside of the Overseer */ @@ -766,7 +750,7 @@ public class Overseer implements Closeable { * see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}. * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue - * @return a {@link ZkDistributedQueue} object + * @return a {@link DistributedQueue} object */ static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) { return getCollectionQueue(zkClient, new Stats()); @@ -784,7 +768,7 @@ public class Overseer implements Closeable { * see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}. * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue - * @return a {@link ZkDistributedQueue} object + * @return a {@link DistributedQueue} object */ static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) { createOverseerNode(zkClient); @@ -804,7 +788,7 @@ public class Overseer implements Closeable { * see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}. * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue - * @return a {@link ZkDistributedQueue} object + * @return a {@link DistributedQueue} object */ static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient) { return getConfigSetQueue(zkClient, new Stats()); @@ -827,7 +811,7 @@ public class Overseer implements Closeable { * {@link OverseerConfigSetMessageHandler}. * * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue - * @return a {@link ZkDistributedQueue} object + * @return a {@link DistributedQueue} object */ static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) { // For now, we use the same queue as the collection queue, but ensure diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java index f1bcebe6c21..577cf641c1b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java @@ -36,7 +36,6 @@ import com.google.common.collect.ImmutableMap; import org.apache.commons.lang.StringUtils; import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException; @@ -322,7 +321,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler } private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results) - throws Exception { + throws KeeperException, InterruptedException { checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP); SolrZkClient zkClient = zkStateReader.getZkClient(); DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient); @@ -333,7 +332,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler inQueue.offer(Utils.toJSON(m)); } - private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { + private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException { if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) || StringUtils.isBlank(message.getStr(PROPERTY_PROP))) { throw new SolrException(ErrorCode.BAD_REQUEST, "The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP + @@ -442,7 +441,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler } } - void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception { + void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException { ZkNodeProps m = new ZkNodeProps( Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, core, @@ -463,7 +462,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler } //TODO should we not remove in the next release ? - private void migrateStateFormat(ClusterState state, ZkNodeProps message, NamedList results) throws Exception { + private void migrateStateFormat(ClusterState state, ZkNodeProps message, NamedList results) + throws KeeperException, InterruptedException { final String collectionName = message.getStr(COLLECTION_PROP); boolean firstLoop = true; @@ -634,7 +634,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler private void modifyCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) - throws Exception { + throws KeeperException, InterruptedException { final String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP); //the rest of the processing is based on writing cluster state properties @@ -712,7 +712,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler } ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) - throws Exception { + throws KeeperException, InterruptedException { return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete); } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java index f0e3676fb01..7db13156638 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java @@ -56,7 +56,7 @@ public class OverseerNodePrioritizer { this.shardHandlerFactory = shardHandlerFactory; } - public synchronized void prioritizeOverseerNodes(String overseerId) throws Exception { + public synchronized void prioritizeOverseerNodes(String overseerId) throws KeeperException, InterruptedException { SolrZkClient zk = zkStateReader.getZkClient(); if(!zk.exists(ZkStateReader.ROLES,true))return; Map m = (Map) Utils.fromJSON(zk.getData(ZkStateReader.ROLES, null, new Stat(), true)); diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java index b381c3b1c7f..92e34cfe498 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java @@ -35,11 +35,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A {@link ZkDistributedQueue} augmented with helper methods specific to the overseer task queues. + * A {@link DistributedQueue} augmented with helper methods specific to the overseer task queues. * Methods specific to this subclass ignore superclass internal state and hit ZK directly. * This is inefficient! But the API on this class is kind of muddy.. */ -public class OverseerTaskQueue extends ZkDistributedQueue { +public class OverseerTaskQueue extends DistributedQueue { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final String RESPONSE_PREFIX = "qnr-" ; diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 8e0977bbbee..8a6b99b2c95 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -174,7 +174,7 @@ public class RecoveryStrategy implements Runnable, Closeable { final private void recoveryFailed(final SolrCore core, final ZkController zkController, final String baseUrl, - final String shardZkNodeName, final CoreDescriptor cd) throws Exception { + final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException { SolrException.log(LOG, "Recovery failed - I give up."); try { zkController.publish(cd, Replica.State.RECOVERY_FAILED); @@ -297,7 +297,7 @@ public class RecoveryStrategy implements Runnable, Closeable { } } - final public void doRecovery(SolrCore core) throws Exception { + final public void doRecovery(SolrCore core) throws KeeperException, InterruptedException { if (core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog()) { doSyncOrReplicateRecovery(core); } else { @@ -440,7 +440,7 @@ public class RecoveryStrategy implements Runnable, Closeable { } // TODO: perhaps make this grab a new core each time through the loop to handle core reloads? - final public void doSyncOrReplicateRecovery(SolrCore core) throws Exception { + final public void doSyncOrReplicateRecovery(SolrCore core) throws KeeperException, InterruptedException { boolean replayed = false; boolean successfulRecovery = false; diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java index f25a3cf7786..9edf7298867 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java @@ -32,8 +32,8 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; -import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.cloud.ReplicaPosition; diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java index afae09d7ea5..3bd6d8965cf 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd; diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 293e9d3f41b..709d2ed1827 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -51,16 +51,10 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Strings; import org.apache.commons.lang.StringUtils; -import org.apache.solr.client.solrj.cloud.DistributedQueue; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; -import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder; -import org.apache.solr.client.solrj.impl.SolrClientDataProvider; -import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider; import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; -import org.apache.solr.cloud.autoscaling.ZkDistributedQueueFactory; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.cloud.overseer.SliceMutator; import org.apache.solr.common.SolrException; @@ -197,8 +191,6 @@ public class ZkController { private final SolrZkClient zkClient; private final ZkCmdExecutor cmdExecutor; public final ZkStateReader zkStateReader; - private ClusterDataProvider clusterDataProvider; - private CloudSolrClient cloudSolrClient; private final String zkServerAddress; // example: 127.0.0.1:54062/solr @@ -443,7 +435,7 @@ public class ZkController { }); init(registerOnReconnect); - + assert ObjectReleaseTracker.track(this); } @@ -562,12 +554,6 @@ public class ZkController { IOUtils.closeQuietly(overseerElector.getContext()); IOUtils.closeQuietly(overseer); } finally { - if (cloudSolrClient != null) { - IOUtils.closeQuietly(cloudSolrClient); - } - if (clusterDataProvider != null) { - IOUtils.closeQuietly(clusterDataProvider); - } try { try { zkStateReader.close(); @@ -602,22 +588,6 @@ public class ZkController { return zkStateReader.getClusterState(); } - public ClusterDataProvider getClusterDataProvider() { - if (clusterDataProvider != null) { - return clusterDataProvider; - } - synchronized(this) { - if (clusterDataProvider != null) { - return clusterDataProvider; - } - cloudSolrClient = new CloudSolrClient.Builder() - .withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader)) - .build(); - clusterDataProvider = new SolrClientDataProvider(new ZkDistributedQueueFactory(zkClient), cloudSolrClient); - } - return clusterDataProvider; - } - /** * Returns config file data (in bytes) */ @@ -1320,18 +1290,18 @@ public class ZkController { return baseURL; } - public void publish(final CoreDescriptor cd, final Replica.State state) throws Exception { + public void publish(final CoreDescriptor cd, final Replica.State state) throws KeeperException, InterruptedException { publish(cd, state, true); } - public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws Exception { + public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws KeeperException, InterruptedException { publish(cd, state, updateLastState, false); } /** * Publish core state to overseer. */ - public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState, boolean forcePublish) throws Exception { + public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState, boolean forcePublish) throws KeeperException, InterruptedException { if (!forcePublish) { try (SolrCore core = cc.getCore(cd.getName())) { if (core == null || core.isClosed()) { @@ -1440,7 +1410,7 @@ public class ZkController { return true; } - public void unregister(String coreName, CoreDescriptor cd) throws Exception { + public void unregister(String coreName, CoreDescriptor cd) throws InterruptedException, KeeperException { final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); final String collection = cd.getCloudDescriptor().getCollectionName(); @@ -1471,7 +1441,8 @@ public class ZkController { overseerJobQueue.offer(Utils.toJSON(m)); } - public void createCollection(String collection) throws Exception { + public void createCollection(String collection) throws KeeperException, + InterruptedException { ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), ZkStateReader.NODE_NAME_PROP, getNodeName(), ZkStateReader.COLLECTION_PROP, collection); @@ -1596,9 +1567,6 @@ public class ZkController { Thread.currentThread().interrupt(); log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); - } catch (Exception e) { - log.error("", e); - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e); } if (cd.getCloudDescriptor().getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getClusterState(), coreNodeName)) { diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ActionContext.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ActionContext.java index ede855d7458..178a972a200 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ActionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ActionContext.java @@ -20,7 +20,6 @@ package org.apache.solr.cloud.autoscaling; import java.io.IOException; import java.util.Map; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.common.MapWriter; import org.apache.solr.core.CoreContainer; @@ -31,18 +30,18 @@ import org.apache.solr.core.CoreContainer; */ public class ActionContext implements MapWriter { - private final ClusterDataProvider clusterDataProvider; + private final CoreContainer coreContainer; private final AutoScaling.Trigger source; private final Map properties; - public ActionContext(ClusterDataProvider clusterDataProvider, AutoScaling.Trigger source, Map properties) { - this.clusterDataProvider = clusterDataProvider; + public ActionContext(CoreContainer coreContainer, AutoScaling.Trigger source, Map properties) { + this.coreContainer = coreContainer; this.source = source; this.properties = properties; } - public ClusterDataProvider getClusterDataProvider() { - return clusterDataProvider; + public CoreContainer getCoreContainer() { + return coreContainer; } public AutoScaling.Trigger getSource() { diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java index 5e317f4e8a0..ff0946979ee 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java @@ -18,12 +18,8 @@ package org.apache.solr.cloud.autoscaling; -import java.io.IOException; - -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester; import org.apache.solr.client.solrj.cloud.autoscaling.Policy; -import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.ZkStateReader; @@ -31,20 +27,15 @@ import org.apache.solr.common.cloud.ZkStateReader; public class AutoAddReplicasPlanAction extends ComputePlanAction { @Override - protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, ClusterDataProvider cdp) { + protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, ZkStateReader zkStateReader) { // for backward compatibility - String autoAddReplicas = cdp.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null); + String autoAddReplicas = zkStateReader.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null); if (autoAddReplicas != null && autoAddReplicas.equals("false")) { return new NoneSuggester(); } - Policy.Suggester suggester = super.getSuggester(session, event, cdp); - ClusterState clusterState; - try { - clusterState = cdp.getClusterState(); - } catch (IOException e) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception getting cluster state", e); - } + Policy.Suggester suggester = super.getSuggester(session, event, zkStateReader); + ClusterState clusterState = zkStateReader.getClusterState(); boolean anyCollections = false; for (DocCollection collection: clusterState.getCollectionsMap().values()) { diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java index f649d39fc5f..ed24bf7880c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java @@ -24,10 +24,8 @@ import java.util.Map; import com.google.common.base.Preconditions; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.core.CoreContainer; -import org.apache.solr.core.SolrResourceLoader; public class AutoScaling { @@ -112,13 +110,30 @@ public class AutoScaling { void init(); } - /** - * Factory to produce instances of {@link Trigger}. - */ - public static abstract class TriggerFactory implements Closeable { - protected boolean isClosed = false; + public static class TriggerFactory implements Closeable { - public abstract Trigger create(TriggerEventType type, String name, Map props); + private final CoreContainer coreContainer; + + private boolean isClosed = false; + + public TriggerFactory(CoreContainer coreContainer) { + Preconditions.checkNotNull(coreContainer); + this.coreContainer = coreContainer; + } + + public synchronized Trigger create(TriggerEventType type, String name, Map props) { + if (isClosed) { + throw new AlreadyClosedException("TriggerFactory has already been closed, cannot create new triggers"); + } + switch (type) { + case NODEADDED: + return new NodeAddedTrigger(name, props, coreContainer); + case NODELOST: + return new NodeLostTrigger(name, props, coreContainer); + default: + throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name); + } + } @Override public void close() throws IOException { @@ -128,38 +143,6 @@ public class AutoScaling { } } - /** - * Default implementation of {@link TriggerFactory}. - */ - public static class TriggerFactoryImpl extends TriggerFactory { - - private final ClusterDataProvider clusterDataProvider; - private final SolrResourceLoader loader; - - public TriggerFactoryImpl(SolrResourceLoader loader, ClusterDataProvider clusterDataProvider) { - Preconditions.checkNotNull(clusterDataProvider); - Preconditions.checkNotNull(loader); - this.clusterDataProvider = clusterDataProvider; - this.loader = loader; - } - - @Override - public synchronized Trigger create(TriggerEventType type, String name, Map props) { - if (isClosed) { - throw new AlreadyClosedException("TriggerFactory has already been closed, cannot create new triggers"); - } - switch (type) { - case NODEADDED: - return new NodeAddedTrigger(name, props, loader, clusterDataProvider); - case NODELOST: - return new NodeLostTrigger(name, props, loader, clusterDataProvider); - default: - throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name); - } - } - - } - public static final String AUTO_ADD_REPLICAS_TRIGGER_DSL = "{" + " 'set-trigger' : {" + diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java index 5a6af701dc0..a5d1bc7dd64 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java @@ -214,7 +214,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission try (CloudSolrClient build = new CloudSolrClient.Builder() .withHttpClient(container.getUpdateShardHandler().getHttpClient()) .withZkHost(container.getZkController().getZkServerAddress()).build()) { - Policy.Session session = policy.createSession(new SolrClientDataProvider(new ZkDistributedQueueFactory(container.getZkController().getZkClient()), build)); + Policy.Session session = policy.createSession(new SolrClientDataProvider(build)); List sorted = session.getSorted(); List violations = session.getViolations(); @@ -638,8 +638,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission try (CloudSolrClient build = new CloudSolrClient.Builder() .withHttpClient(container.getUpdateShardHandler().getHttpClient()) .withZkHost(container.getZkController().getZkServerAddress()).build()) { - Policy.Session session = autoScalingConf.getPolicy() - .createSession(new SolrClientDataProvider(new ZkDistributedQueueFactory(container.getZkController().getZkClient()), build)); + Policy.Session session = autoScalingConf.getPolicy().createSession(new SolrClientDataProvider(build)); log.debug("Verified autoscaling configuration"); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java index 6fd0dbede6a..93441fec924 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java @@ -25,7 +25,6 @@ import java.util.Map; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.Policy; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.SolrClientDataProvider; @@ -49,30 +48,40 @@ public class ComputePlanAction extends TriggerActionBase { @Override public void process(TriggerEvent event, ActionContext context) { log.debug("-- processing event: {} with context properties: {}", event, context.getProperties()); - ClusterDataProvider cdp = context.getClusterDataProvider(); + CoreContainer container = context.getCoreContainer(); try { - AutoScalingConfig autoScalingConf = cdp.getAutoScalingConfig(); - if (autoScalingConf.isEmpty()) { - log.error("Action: " + getName() + " executed but no policy is configured"); - return; - } - Policy policy = autoScalingConf.getPolicy(); - Policy.Session session = policy.createSession(cdp); - Policy.Suggester suggester = getSuggester(session, event, cdp); - while (true) { - SolrRequest operation = suggester.getOperation(); - if (operation == null) break; - log.info("Computed Plan: {}", operation.getParams()); - Map props = context.getProperties(); - props.compute("operations", (k, v) -> { - List operations = (List) v; - if (operations == null) operations = new ArrayList<>(); - operations.add(operation); - return operations; - }); - session = suggester.getSession(); - suggester = getSuggester(session, event, cdp); + try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder() + .withZkHost(container.getZkController().getZkServerAddress()) + .withHttpClient(container.getUpdateShardHandler().getHttpClient()) + .build()) { + ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader(); + AutoScalingConfig autoScalingConf = zkStateReader.getAutoScalingConfig(); + if (autoScalingConf.isEmpty()) { + log.error("Action: " + getName() + " executed but no policy is configured"); + return; + } + Policy policy = autoScalingConf.getPolicy(); + Policy.Session session = policy.createSession(new SolrClientDataProvider(cloudSolrClient)); + Policy.Suggester suggester = getSuggester(session, event, zkStateReader); + while (true) { + SolrRequest operation = suggester.getOperation(); + if (operation == null) break; + log.info("Computed Plan: {}", operation.getParams()); + Map props = context.getProperties(); + props.compute("operations", (k, v) -> { + List operations = (List) v; + if (operations == null) operations = new ArrayList<>(); + operations.add(operation); + return operations; + }); + session = suggester.getSession(); + suggester = getSuggester(session, event, zkStateReader); + } } + } catch (KeeperException e) { + log.error("ZooKeeperException while processing event: " + event, e); + } catch (InterruptedException e) { + log.error("Interrupted while processing event: " + event, e); } catch (IOException e) { log.error("IOException while processing event: " + event, e); } catch (Exception e) { @@ -80,7 +89,7 @@ public class ComputePlanAction extends TriggerActionBase { } } - protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, ClusterDataProvider cdp) { + protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, ZkStateReader zkStateReader) { Policy.Suggester suggester; switch (event.getEventType()) { case NODEADDED: diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java index 01c25c0dca9..2cd982403fe 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java @@ -25,7 +25,6 @@ import java.util.List; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.common.SolrException; @@ -44,29 +43,32 @@ public class ExecutePlanAction extends TriggerActionBase { @Override public void process(TriggerEvent event, ActionContext context) { log.debug("-- processing event: {} with context properties: {}", event, context.getProperties()); - ClusterDataProvider clusterDataProvider = context.getClusterDataProvider(); + CoreContainer container = context.getCoreContainer(); List operations = (List) context.getProperty("operations"); if (operations == null || operations.isEmpty()) { log.info("No operations to execute for event: {}", event); return; } - try { + try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder() + .withZkHost(container.getZkController().getZkServerAddress()) + .withHttpClient(container.getUpdateShardHandler().getHttpClient()) + .build()) { for (SolrRequest operation : operations) { log.info("Executing operation: {}", operation.getParams()); try { - SolrResponse response = clusterDataProvider.request(operation); + SolrResponse response = operation.process(cloudSolrClient); context.getProperties().compute("responses", (s, o) -> { List> responses = (List>) o; if (responses == null) responses = new ArrayList<>(operations.size()); responses.add(response.getResponse()); return responses; }); - } catch (Exception e) { + } catch (SolrServerException | HttpSolrClient.RemoteSolrException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unexpected exception executing operation: " + operation.getParams(), e); } } - } catch (Exception e) { + } catch (IOException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unexpected IOException while processing event: " + event, e); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java index dadc3afd096..2af4f30b038 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java @@ -30,7 +30,6 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.entity.StringEntity; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage; import org.apache.solr.client.solrj.impl.HttpClientUtil; import org.apache.solr.common.util.Utils; @@ -73,9 +72,9 @@ public class HttpTriggerListener extends TriggerListenerBase { private boolean followRedirects; @Override - public void init(ClusterDataProvider clusterDataProvider, AutoScalingConfig.TriggerListenerConfig config) { - super.init(clusterDataProvider, config); - httpClient = clusterDataProvider.getHttpClient(); + public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) { + super.init(coreContainer, config); + httpClient = coreContainer.getUpdateShardHandler().getHttpClient(); urlTemplate = (String)config.properties.get("url"); payloadTemplate = (String)config.properties.get("payload"); contentType = (String)config.properties.get("contentType"); diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java index 387458cc902..0c0278b12b2 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java @@ -17,24 +17,29 @@ package org.apache.solr.cloud.autoscaling; +import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; -import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; +import org.apache.lucene.util.IOUtils; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.core.SolrResourceLoader; +import org.apache.solr.core.CoreContainer; import org.apache.solr.util.TimeSource; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,28 +49,60 @@ import org.slf4j.LoggerFactory; public class NodeAddedTrigger extends TriggerBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final String name; + private final Map properties; + private final CoreContainer container; + private final List actions; + private final AtomicReference processorRef; + private final boolean enabled; + private final int waitForSecond; + private final TriggerEventType eventType; private final TimeSource timeSource; + private boolean isClosed = false; + private Set lastLiveNodes; private Map nodeNameVsTimeAdded = new HashMap<>(); public NodeAddedTrigger(String name, Map properties, - SolrResourceLoader loader, - ClusterDataProvider clusterDataProvider) { - super(name, properties, loader, clusterDataProvider); + CoreContainer container) { + super(container.getZkController().getZkClient()); + this.name = name; + this.properties = properties; + this.container = container; this.timeSource = TimeSource.CURRENT_TIME; - lastLiveNodes = new HashSet<>(clusterDataProvider.getLiveNodes()); + this.processorRef = new AtomicReference<>(); + List> o = (List>) properties.get("actions"); + if (o != null && !o.isEmpty()) { + actions = new ArrayList<>(3); + for (Map map : o) { + TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class); + actions.add(action); + } + } else { + actions = Collections.emptyList(); + } + lastLiveNodes = new HashSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes()); log.debug("Initial livenodes: {}", lastLiveNodes); + this.enabled = Boolean.parseBoolean(String.valueOf(properties.getOrDefault("enabled", "true"))); + this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue(); + this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT)); log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties); } @Override public void init() { - super.init(); + List> o = (List>) properties.get("actions"); + if (o != null && !o.isEmpty()) { + for (int i = 0; i < o.size(); i++) { + Map map = o.get(i); + actions.get(i).init(map); + } + } // pick up added nodes for which marker paths were created try { - List added = clusterDataProvider.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH); + List added = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true); added.forEach(n -> { // don't add nodes that have since gone away if (lastLiveNodes.contains(n)) { @@ -74,14 +111,77 @@ public class NodeAddedTrigger extends TriggerBase { } removeMarker(n); }); - } catch (NoSuchElementException e) { + } catch (KeeperException.NoNodeException e) { // ignore - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { log.warn("Exception retrieving nodeLost markers", e); } } + @Override + public void setProcessor(AutoScaling.TriggerEventProcessor processor) { + processorRef.set(processor); + } + + @Override + public AutoScaling.TriggerEventProcessor getProcessor() { + return processorRef.get(); + } + + @Override + public String getName() { + return name; + } + + @Override + public TriggerEventType getEventType() { + return eventType; + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public int getWaitForSecond() { + return waitForSecond; + } + + @Override + public Map getProperties() { + return properties; + } + + @Override + public List getActions() { + return actions; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof NodeAddedTrigger) { + NodeAddedTrigger that = (NodeAddedTrigger) obj; + return this.name.equals(that.name) + && this.properties.equals(that.properties); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(name, properties); + } + + @Override + public void close() throws IOException { + synchronized (this) { + isClosed = true; + IOUtils.closeWhileHandlingException(actions); + } + } + @Override public void restoreState(AutoScaling.Trigger old) { assert old.isClosed(); @@ -129,7 +229,8 @@ public class NodeAddedTrigger extends TriggerBase { } log.debug("Running NodeAddedTrigger {}", name); - Set newLiveNodes = new HashSet<>(clusterDataProvider.getLiveNodes()); + ZkStateReader reader = container.getZkController().getZkStateReader(); + Set newLiveNodes = reader.getClusterState().getLiveNodes(); log.debug("Found livenodes: {}", newLiveNodes); // have any nodes that we were tracking been removed from the cluster? @@ -186,17 +287,24 @@ public class NodeAddedTrigger extends TriggerBase { private void removeMarker(String nodeName) { String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName; try { - if (clusterDataProvider.hasData(path)) { - clusterDataProvider.removeData(path, -1); + if (container.getZkController().getZkClient().exists(path, true)) { + container.getZkController().getZkClient().delete(path, -1, true); } - } catch (NoSuchElementException e) { + } catch (KeeperException.NoNodeException e) { // ignore - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { log.debug("Exception removing nodeAdded marker " + nodeName, e); } } + @Override + public boolean isClosed() { + synchronized (this) { + return isClosed; + } + } + public static class NodeAddedEvent extends TriggerEvent { public NodeAddedEvent(TriggerEventType eventType, String source, List times, List nodeNames) { diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java index d1e15f7dbf3..18dafcbbaee 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java @@ -17,24 +17,29 @@ package org.apache.solr.cloud.autoscaling; +import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; -import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; +import org.apache.lucene.util.IOUtils; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.core.SolrResourceLoader; +import org.apache.solr.core.CoreContainer; import org.apache.solr.util.TimeSource; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,27 +49,59 @@ import org.slf4j.LoggerFactory; public class NodeLostTrigger extends TriggerBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final String name; + private final Map properties; + private final CoreContainer container; + private final List actions; + private final AtomicReference processorRef; + private final boolean enabled; + private final int waitForSecond; + private final TriggerEventType eventType; private final TimeSource timeSource; + private boolean isClosed = false; + private Set lastLiveNodes; private Map nodeNameVsTimeRemoved = new HashMap<>(); public NodeLostTrigger(String name, Map properties, - SolrResourceLoader loader, - ClusterDataProvider clusterDataProvider) { - super(name, properties, loader, clusterDataProvider); + CoreContainer container) { + super(container.getZkController().getZkClient()); + this.name = name; + this.properties = properties; + this.container = container; this.timeSource = TimeSource.CURRENT_TIME; - lastLiveNodes = new HashSet<>(clusterDataProvider.getLiveNodes()); + this.processorRef = new AtomicReference<>(); + List> o = (List>) properties.get("actions"); + if (o != null && !o.isEmpty()) { + actions = new ArrayList<>(3); + for (Map map : o) { + TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class); + actions.add(action); + } + } else { + actions = Collections.emptyList(); + } + lastLiveNodes = new HashSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes()); log.debug("Initial livenodes: {}", lastLiveNodes); + this.enabled = Boolean.parseBoolean(String.valueOf(properties.getOrDefault("enabled", "true"))); + this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue(); + this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT)); } @Override public void init() { - super.init(); + List> o = (List>) properties.get("actions"); + if (o != null && !o.isEmpty()) { + for (int i = 0; i < o.size(); i++) { + Map map = o.get(i); + actions.get(i).init(map); + } + } // pick up lost nodes for which marker paths were created try { - List lost = clusterDataProvider.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH); + List lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true); lost.forEach(n -> { // don't add nodes that have since came back if (!lastLiveNodes.contains(n)) { @@ -73,18 +110,76 @@ public class NodeLostTrigger extends TriggerBase { } removeMarker(n); }); - } catch (NoSuchElementException e) { + } catch (KeeperException.NoNodeException e) { // ignore - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { log.warn("Exception retrieving nodeLost markers", e); } } + @Override + public void setProcessor(AutoScaling.TriggerEventProcessor processor) { + processorRef.set(processor); + } + + @Override + public AutoScaling.TriggerEventProcessor getProcessor() { + return processorRef.get(); + } + + @Override + public String getName() { + return name; + } + + @Override + public TriggerEventType getEventType() { + return eventType; + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public int getWaitForSecond() { + return waitForSecond; + } + + @Override + public Map getProperties() { + return properties; + } + @Override public List getActions() { return actions; } + @Override + public boolean equals(Object obj) { + if (obj instanceof NodeLostTrigger) { + NodeLostTrigger that = (NodeLostTrigger) obj; + return this.name.equals(that.name) + && this.properties.equals(that.properties); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(name, properties); + } + + @Override + public void close() throws IOException { + synchronized (this) { + isClosed = true; + IOUtils.closeWhileHandlingException(actions); + } + } + @Override public void restoreState(AutoScaling.Trigger old) { assert old.isClosed(); @@ -131,7 +226,8 @@ public class NodeLostTrigger extends TriggerBase { } } - Set newLiveNodes = new HashSet<>(clusterDataProvider.getLiveNodes()); + ZkStateReader reader = container.getZkController().getZkStateReader(); + Set newLiveNodes = reader.getClusterState().getLiveNodes(); log.debug("Running NodeLostTrigger: {} with currently live nodes: {}", name, newLiveNodes); // have any nodes that we were tracking been added to the cluster? @@ -190,16 +286,23 @@ public class NodeLostTrigger extends TriggerBase { private void removeMarker(String nodeName) { String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeName; try { - if (clusterDataProvider.hasData(path)) { - clusterDataProvider.removeData(path, -1); + if (container.getZkController().getZkClient().exists(path, true)) { + container.getZkController().getZkClient().delete(path, -1, true); } - } catch (NoSuchElementException e) { + } catch (KeeperException.NoNodeException e) { // ignore - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { log.warn("Exception removing nodeLost marker " + nodeName, e); } } + @Override + public boolean isClosed() { + synchronized (this) { + return isClosed; + } + } + public static class NodeLostEvent extends TriggerEvent { public NodeLostEvent(TriggerEventType eventType, String source, List times, List nodeNames) { diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java index b2bc3c30700..6b7aa9185dc 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java @@ -20,19 +20,16 @@ package org.apache.solr.cloud.autoscaling; import java.io.Closeable; import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.net.ConnectException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.store.AlreadyClosedException; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; @@ -40,7 +37,6 @@ import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.solr.common.util.IOUtils; -import org.apache.solr.core.SolrResourceLoader; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -56,7 +52,11 @@ public class OverseerTriggerThread implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final ClusterDataProvider clusterDataProvider; + private final ZkController zkController; + + private final ZkStateReader zkStateReader; + + private final SolrZkClient zkClient; private final ScheduledTriggers scheduledTriggers; @@ -77,10 +77,12 @@ public class OverseerTriggerThread implements Runnable, Closeable { private AutoScalingConfig autoScalingConfig; - public OverseerTriggerThread(SolrResourceLoader loader, ClusterDataProvider clusterDataProvider) { - this.clusterDataProvider = clusterDataProvider; - scheduledTriggers = new ScheduledTriggers(loader, clusterDataProvider); - triggerFactory = new AutoScaling.TriggerFactoryImpl(loader, clusterDataProvider); + public OverseerTriggerThread(ZkController zkController) { + this.zkController = zkController; + zkStateReader = zkController.getZkStateReader(); + zkClient = zkController.getZkClient(); + scheduledTriggers = new ScheduledTriggers(zkController); + triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer()); } @Override @@ -104,8 +106,11 @@ public class OverseerTriggerThread implements Runnable, Closeable { try { refreshAutoScalingConf(new AutoScalingWatcher()); - } catch (ConnectException e) { + } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) { log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage()); + } catch (KeeperException e) { + log.error("A ZK error has occurred", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); @@ -196,26 +201,26 @@ public class OverseerTriggerThread implements Runnable, Closeable { if (cleanOldNodeLostMarkers) { log.debug("-- clean old nodeLost markers"); try { - List markers = clusterDataProvider.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH); + List markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true); markers.forEach(n -> { removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n); }); - } catch (NoSuchElementException e) { + } catch (KeeperException.NoNodeException e) { // ignore - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { log.warn("Error removing old nodeLost markers", e); } } if (cleanOldNodeAddedMarkers) { log.debug("-- clean old nodeAdded markers"); try { - List markers = clusterDataProvider.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH); + List markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true); markers.forEach(n -> { removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n); }); - } catch (NoSuchElementException e) { + } catch (KeeperException.NoNodeException e) { // ignore - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { log.warn("Error removing old nodeAdded markers", e); } @@ -226,11 +231,11 @@ public class OverseerTriggerThread implements Runnable, Closeable { private void removeNodeMarker(String path, String nodeName) { path = path + "/" + nodeName; try { - clusterDataProvider.removeData(path, -1); + zkClient.delete(path, -1, true); log.debug(" -- deleted " + path); - } catch (NoSuchElementException e) { + } catch (KeeperException.NoNodeException e) { // ignore - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { log.warn("Error removing old marker " + path, e); } } @@ -245,8 +250,11 @@ public class OverseerTriggerThread implements Runnable, Closeable { try { refreshAutoScalingConf(this); - } catch (ConnectException e) { - log.warn("ZooKeeper watch triggered for autoscaling conf, but we cannot talk to ZK: [{}]", e.getMessage()); + } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) { + log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage()); + } catch (KeeperException e) { + log.error("A ZK error has occurred", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); @@ -258,13 +266,13 @@ public class OverseerTriggerThread implements Runnable, Closeable { } - private void refreshAutoScalingConf(Watcher watcher) throws ConnectException, InterruptedException, IOException { + private void refreshAutoScalingConf(Watcher watcher) throws KeeperException, InterruptedException { updateLock.lock(); try { if (isClosed) { return; } - AutoScalingConfig currentConfig = clusterDataProvider.getAutoScalingConfig(watcher); + AutoScalingConfig currentConfig = zkStateReader.getAutoScalingConfig(watcher); log.debug("Refreshing {} with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, currentConfig.getZkVersion()); if (znodeVersion >= currentConfig.getZkVersion()) { // protect against reordered watcher fires by ensuring that we only move forward diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java index 1d0f5d61f71..ec9ffc1c846 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java @@ -41,18 +41,15 @@ import java.util.stream.Collectors; import org.apache.lucene.store.AlreadyClosedException; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage; import org.apache.solr.cloud.ActionThrottle; import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.ZkController; -import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.IOUtils; import org.apache.solr.core.CoreContainer; -import org.apache.solr.core.SolrResourceLoader; import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; @@ -88,17 +85,17 @@ public class ScheduledTriggers implements Closeable { private final ActionThrottle actionThrottle; - private final ClusterDataProvider clusterDataProvider; - - private final SolrResourceLoader loader; + private final SolrZkClient zkClient; private final Overseer.Stats queueStats; + private final CoreContainer coreContainer; + private final TriggerListeners listeners; private AutoScalingConfig autoScalingConfig; - public ScheduledTriggers(SolrResourceLoader loader, ClusterDataProvider clusterDataProvider) { + public ScheduledTriggers(ZkController zkController) { // todo make the core pool size configurable // it is important to use more than one because a time taking trigger can starve other scheduled triggers // ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand @@ -111,8 +108,8 @@ public class ScheduledTriggers implements Closeable { actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor")); // todo make the wait time configurable actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS); - this.clusterDataProvider = clusterDataProvider; - this.loader = loader; + coreContainer = zkController.getCoreContainer(); + zkClient = zkController.getZkClient(); queueStats = new Overseer.Stats(); listeners = new TriggerListeners(); } @@ -139,12 +136,7 @@ public class ScheduledTriggers implements Closeable { if (isClosed) { throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore"); } - ScheduledTrigger scheduledTrigger; - try { - scheduledTrigger = new ScheduledTrigger(newTrigger, clusterDataProvider, queueStats); - } catch (IOException e) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "exception creating scheduled trigger", e); - } + ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger, zkClient, queueStats); ScheduledTrigger old = scheduledTriggers.putIfAbsent(newTrigger.getName(), scheduledTrigger); if (old != null) { if (old.trigger.equals(newTrigger)) { @@ -190,7 +182,7 @@ public class ScheduledTriggers implements Closeable { // let the action executor thread wait instead of the trigger thread so we use the throttle here actionThrottle.minimumWaitBetweenActions(); actionThrottle.markAttemptingAction(); - ActionContext actionContext = new ActionContext(clusterDataProvider, newTrigger, new HashMap<>()); + ActionContext actionContext = new ActionContext(coreContainer, newTrigger, new HashMap<>()); for (TriggerAction action : actions) { List beforeActions = (List)actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.BEFORE_ACTION.toString(), k -> new ArrayList()); beforeActions.add(action.getName()); @@ -252,23 +244,23 @@ public class ScheduledTriggers implements Closeable { String statePath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName; String eventsPath = ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName; try { - if (clusterDataProvider.hasData(statePath)) { - clusterDataProvider.removeData(statePath, -1); + if (zkClient.exists(statePath, true)) { + zkClient.delete(statePath, -1, true); } - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { log.warn("Failed to remove state for removed trigger " + statePath, e); } try { - if (clusterDataProvider.hasData(eventsPath)) { - List events = clusterDataProvider.listData(eventsPath); + if (zkClient.exists(eventsPath, true)) { + List events = zkClient.getChildren(eventsPath, null, true); List ops = new ArrayList<>(events.size() + 1); events.forEach(ev -> { ops.add(Op.delete(eventsPath + "/" + ev, -1)); }); ops.add(Op.delete(eventsPath, -1)); - clusterDataProvider.multi(ops); + zkClient.multi(ops, true); } - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { log.warn("Failed to remove events for removed trigger " + eventsPath, e); } } @@ -305,9 +297,9 @@ public class ScheduledTriggers implements Closeable { boolean replay; volatile boolean isClosed; - ScheduledTrigger(AutoScaling.Trigger trigger, ClusterDataProvider clusterDataProvider, Overseer.Stats stats) throws IOException { + ScheduledTrigger(AutoScaling.Trigger trigger, SolrZkClient zkClient, Overseer.Stats stats) { this.trigger = trigger; - this.queue = new TriggerEventQueue(clusterDataProvider, trigger.getName(), stats); + this.queue = new TriggerEventQueue(zkClient, trigger.getName(), stats); this.replay = true; this.isClosed = false; } @@ -434,13 +426,13 @@ public class ScheduledTriggers implements Closeable { if (listener == null) { // create new instance String clazz = config.listenerClass; try { - listener = loader.newInstance(clazz, TriggerListener.class); + listener = coreContainer.getResourceLoader().newInstance(clazz, TriggerListener.class); } catch (Exception e) { log.warn("Invalid TriggerListener class name '" + clazz + "', skipping...", e); } if (listener != null) { try { - listener.init(clusterDataProvider, config); + listener.init(coreContainer, config); listenersPerName.put(config.name, listener); } catch (Exception e) { log.warn("Error initializing TriggerListener " + config, e); diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java index b18d5b73c96..a72b174b730 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java @@ -31,7 +31,6 @@ import java.util.StringJoiner; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.UpdateRequest; @@ -74,8 +73,8 @@ public class SystemLogListener extends TriggerListenerBase { private boolean enabled = true; @Override - public void init(ClusterDataProvider clusterDataProvider, AutoScalingConfig.TriggerListenerConfig config) { - super.init(clusterDataProvider, config); + public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) { + super.init(coreContainer, config); collection = (String)config.properties.getOrDefault(CollectionAdminParams.COLLECTION, CollectionAdminParams.SYSTEM_COLL); enabled = Boolean.parseBoolean(String.valueOf(config.properties.getOrDefault("enabled", true))); } @@ -86,7 +85,10 @@ public class SystemLogListener extends TriggerListenerBase { if (!enabled) { return; } - try { + try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder() + .withZkHost(coreContainer.getZkController().getZkServerAddress()) + .withHttpClient(coreContainer.getUpdateShardHandler().getHttpClient()) + .build()) { SolrInputDocument doc = new SolrInputDocument(); doc.addField(CommonParams.TYPE, DOC_TYPE); doc.addField(SOURCE_FIELD, SOURCE); @@ -120,8 +122,7 @@ public class SystemLogListener extends TriggerListenerBase { } UpdateRequest req = new UpdateRequest(); req.add(doc); - req.setParam(CollectionAdminParams.COLLECTION, collection); - clusterDataProvider.request(req); + cloudSolrClient.request(req, collection); } catch (Exception e) { if ((e instanceof SolrException) && e.getMessage().contains("Collection not found")) { // relatively benign diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java index fd03d389f77..7aff8464c22 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java @@ -16,26 +16,14 @@ */ package org.apache.solr.cloud.autoscaling; -import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.lucene.util.IOUtils; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; -import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.Utils; -import org.apache.solr.core.SolrResourceLoader; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -49,130 +37,19 @@ import org.slf4j.LoggerFactory; public abstract class TriggerBase implements AutoScaling.Trigger { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - protected final String name; - protected final ClusterDataProvider clusterDataProvider; - protected final Map properties = new HashMap<>(); - protected final TriggerEventType eventType; - protected final int waitForSecond; + protected SolrZkClient zkClient; protected Map lastState; - protected final AtomicReference processorRef = new AtomicReference<>(); - protected final List actions; - protected final boolean enabled; - protected boolean isClosed; - protected TriggerBase(String name, Map properties, SolrResourceLoader loader, ClusterDataProvider clusterDataProvider) { - this.name = name; - if (properties != null) { - this.properties.putAll(properties); - } - this.clusterDataProvider = clusterDataProvider; - this.enabled = Boolean.parseBoolean(String.valueOf(this.properties.getOrDefault("enabled", "true"))); - this.eventType = TriggerEventType.valueOf(this.properties.getOrDefault("event", TriggerEventType.INVALID.toString()).toString().toUpperCase(Locale.ROOT)); - this.waitForSecond = ((Long) this.properties.getOrDefault("waitFor", -1L)).intValue(); - List> o = (List>) properties.get("actions"); - if (o != null && !o.isEmpty()) { - actions = new ArrayList<>(3); - for (Map map : o) { - TriggerAction action = loader.newInstance(map.get("class"), TriggerAction.class); - actions.add(action); - } - } else { - actions = Collections.emptyList(); - } - + protected TriggerBase(SolrZkClient zkClient) { + this.zkClient = zkClient; try { - if (!clusterDataProvider.hasData(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH)) { - clusterDataProvider.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH); - } - } catch (IOException e) { + zkClient.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, false, true); + } catch (KeeperException | InterruptedException e) { LOG.warn("Exception checking ZK path " + ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, e); } } - @Override - public void init() { - List> o = (List>) properties.get("actions"); - if (o != null && !o.isEmpty()) { - for (int i = 0; i < o.size(); i++) { - Map map = o.get(i); - actions.get(i).init(map); - } - } - } - - @Override - public void setProcessor(AutoScaling.TriggerEventProcessor processor) { - processorRef.set(processor); - } - - @Override - public AutoScaling.TriggerEventProcessor getProcessor() { - return processorRef.get(); - } - - @Override - public String getName() { - return name; - } - - @Override - public TriggerEventType getEventType() { - return eventType; - } - - @Override - public boolean isEnabled() { - return enabled; - } - - @Override - public int getWaitForSecond() { - return waitForSecond; - } - - @Override - public Map getProperties() { - return properties; - } - - @Override - public List getActions() { - return actions; - } - - @Override - public boolean isClosed() { - synchronized (this) { - return isClosed; - } - } - @Override - public void close() throws IOException { - synchronized (this) { - isClosed = true; - IOUtils.closeWhileHandlingException(actions); - } - } - - @Override - public int hashCode() { - return Objects.hash(name, properties); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (obj.getClass().equals(this.getClass())) { - TriggerBase that = (TriggerBase) obj; - return this.name.equals(that.name) - && this.properties.equals(that.properties); - } - return false; - } - /** * Prepare and return internal state of this trigger in a format suitable for persisting in ZK. * @return map of internal state properties. Note: values must be supported by {@link Utils#toJSON(Object)}. @@ -195,15 +72,15 @@ public abstract class TriggerBase implements AutoScaling.Trigger { byte[] data = Utils.toJSON(state); String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName(); try { - if (clusterDataProvider.hasData(path)) { + if (zkClient.exists(path, true)) { // update - clusterDataProvider.setData(path, data, -1); + zkClient.setData(path, data, -1, true); } else { // create - clusterDataProvider.createData(path, data, CreateMode.PERSISTENT); + zkClient.create(path, data, CreateMode.PERSISTENT, true); } lastState = state; - } catch (IOException e) { + } catch (KeeperException | InterruptedException e) { LOG.warn("Exception updating trigger state '" + path + "'", e); } } @@ -213,10 +90,10 @@ public abstract class TriggerBase implements AutoScaling.Trigger { byte[] data = null; String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName(); try { - if (clusterDataProvider.hasData(path)) { - ClusterDataProvider.VersionedData versionedDat = clusterDataProvider.getData(path); + if (zkClient.exists(path, true)) { + data = zkClient.getData(path, null, new Stat(), true); } - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { LOG.warn("Exception getting trigger state '" + path + "'", e); } if (data != null) { diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java index c0b248ffc82..99f641ca631 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java @@ -1,24 +1,23 @@ package org.apache.solr.cloud.autoscaling; -import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Map; -import java.util.Queue; -import org.apache.solr.client.solrj.cloud.DistributedQueue; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; +import org.apache.solr.cloud.DistributedQueue; import org.apache.solr.cloud.Overseer; +import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.Utils; import org.apache.solr.util.TimeSource; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * */ -public class TriggerEventQueue { +public class TriggerEventQueue extends DistributedQueue { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String ENQUEUE_TIME = "_enqueue_time_"; @@ -26,11 +25,9 @@ public class TriggerEventQueue { private final String triggerName; private final TimeSource timeSource; - private final DistributedQueue delegate; - public TriggerEventQueue(ClusterDataProvider clusterDataProvider, String triggerName, Overseer.Stats stats) throws IOException { - // TODO: collect stats - this.delegate = clusterDataProvider.getDistributedQueueFactory().makeQueue(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName); + public TriggerEventQueue(SolrZkClient zookeeper, String triggerName, Overseer.Stats stats) { + super(zookeeper, ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName, stats); this.triggerName = triggerName; this.timeSource = TimeSource.CURRENT_TIME; } @@ -39,9 +36,9 @@ public class TriggerEventQueue { event.getProperties().put(ENQUEUE_TIME, timeSource.getTime()); try { byte[] data = Utils.toJSON(event); - delegate.offer(data); + offer(data); return true; - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { LOG.warn("Exception adding event " + event + " to queue " + triggerName, e); return false; } @@ -50,7 +47,7 @@ public class TriggerEventQueue { public TriggerEvent peekEvent() { byte[] data; try { - while ((data = delegate.peek()) != null) { + while ((data = peek()) != null) { if (data.length == 0) { LOG.warn("ignoring empty data..."); continue; @@ -63,7 +60,7 @@ public class TriggerEventQueue { continue; } } - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { LOG.warn("Exception peeking queue of trigger " + triggerName, e); } return null; @@ -72,7 +69,7 @@ public class TriggerEventQueue { public TriggerEvent pollEvent() { byte[] data; try { - while ((data = delegate.poll()) != null) { + while ((data = poll()) != null) { if (data.length == 0) { LOG.warn("ignoring empty data..."); continue; @@ -85,7 +82,7 @@ public class TriggerEventQueue { continue; } } - } catch (Exception e) { + } catch (KeeperException | InterruptedException e) { LOG.warn("Exception polling queue of trigger " + triggerName, e); } return null; diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java index 4236522d0db..3688bfcdcbc 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java @@ -19,7 +19,6 @@ package org.apache.solr.cloud.autoscaling; import java.io.Closeable; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage; import org.apache.solr.core.CoreContainer; @@ -29,7 +28,7 @@ import org.apache.solr.core.CoreContainer; */ public interface TriggerListener extends Closeable { - void init(ClusterDataProvider clusterDataProvider, AutoScalingConfig.TriggerListenerConfig config) throws Exception; + void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) throws Exception; AutoScalingConfig.TriggerListenerConfig getConfig(); diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java index d8b8fa3d53e..01a44135d77 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java @@ -19,7 +19,7 @@ package org.apache.solr.cloud.autoscaling; import java.io.IOException; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; +import org.apache.solr.core.CoreContainer; /** * Base class for implementations of {@link TriggerListener}. @@ -27,11 +27,11 @@ import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; public abstract class TriggerListenerBase implements TriggerListener { protected AutoScalingConfig.TriggerListenerConfig config; - protected ClusterDataProvider clusterDataProvider; + protected CoreContainer coreContainer; @Override - public void init(ClusterDataProvider clusterDataProvider, AutoScalingConfig.TriggerListenerConfig config) { - this.clusterDataProvider = clusterDataProvider; + public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) { + this.coreContainer = coreContainer; this.config = config; } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ZkDistributedQueueFactory.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ZkDistributedQueueFactory.java deleted file mode 100644 index 4cf57f98534..00000000000 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ZkDistributedQueueFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.apache.solr.cloud.autoscaling; - -import java.io.IOException; - -import org.apache.solr.client.solrj.cloud.DistributedQueue; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; -import org.apache.solr.cloud.ZkDistributedQueue; -import org.apache.solr.common.cloud.SolrZkClient; - -/** - * - */ -public class ZkDistributedQueueFactory implements ClusterDataProvider.DistributedQueueFactory { - private final SolrZkClient zkClient; - - public ZkDistributedQueueFactory(SolrZkClient zkClient) { - this.zkClient = zkClient; - } - @Override - public DistributedQueue makeQueue(String path) throws IOException { - return new ZkDistributedQueue(zkClient, path); - } - - @Override - public void removeQueue(String path) throws IOException { - - } -} diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 57847f0e3e3..0198f9ddf98 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -956,8 +956,6 @@ public class CoreContainer { SolrException.log(log, null, e); } catch (KeeperException e) { SolrException.log(log, null, e); - } catch (Exception e) { - SolrException.log(log, null, e); } } @@ -1379,8 +1377,6 @@ public class CoreContainer { throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted while unregistering core [" + name + "] from cloud state"); } catch (KeeperException e) { throw new SolrException(ErrorCode.SERVER_ERROR, "Error unregistering core [" + name + "] from cloud state", e); - } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR, "Error unregistering core [" + name + "] from cloud state", e); } } } diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java index 37155ca632e..16ba4d861dd 100644 --- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java +++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java @@ -227,8 +227,6 @@ public class ZkContainer { } catch (InterruptedException e) { Thread.interrupted(); ZkContainer.log.error("", e); - } catch (Exception e) { - ZkContainer.log.error("", e); } } } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 849fc4936bb..45f6ea2ebca 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -46,7 +46,7 @@ import org.apache.solr.client.solrj.request.GenericSolrRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.SimpleSolrResponse; import org.apache.solr.cloud.CloudDescriptor; -import org.apache.solr.client.solrj.cloud.DistributedQueue; +import org.apache.solr.cloud.DistributedQueue; import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.overseer.OverseerAction; diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java index 028525ee1da..ed3d03b3894 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.Map; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CoreStatus; @@ -83,7 +82,8 @@ public class DeleteShardTest extends SolrCloudTestCase { } - protected void setSliceState(String collection, String slice, State state) throws Exception { + protected void setSliceState(String collection, String slice, State state) throws SolrServerException, IOException, + KeeperException, InterruptedException { CloudSolrClient client = cluster.getSolrClient(); diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java index 7d7e5f5fe74..ed33dc1bce0 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import org.apache.solr.SolrTestCaseJ4; -import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.SolrjNamedThreadFactory; @@ -96,7 +95,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 { String dqZNode = "/distqueue/test"; byte[] data = "hello world".getBytes(UTF8); - ZkDistributedQueue consumer = makeDistributedQueue(dqZNode); + DistributedQueue consumer = makeDistributedQueue(dqZNode); DistributedQueue producer = makeDistributedQueue(dqZNode); DistributedQueue producer2 = makeDistributedQueue(dqZNode); @@ -125,7 +124,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 { String dqZNode = "/distqueue/test"; String testData = "hello world"; - ZkDistributedQueue dq = makeDistributedQueue(dqZNode); + DistributedQueue dq = makeDistributedQueue(dqZNode); assertNull(dq.peek()); Future future = executor.submit(() -> new String(dq.peek(true), UTF8)); @@ -172,7 +171,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 { @Test public void testLeakChildWatcher() throws Exception { String dqZNode = "/distqueue/test"; - ZkDistributedQueue dq = makeDistributedQueue(dqZNode); + DistributedQueue dq = makeDistributedQueue(dqZNode); assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty()); assertEquals(1, dq.watcherCount()); assertFalse(dq.isDirty()); @@ -281,8 +280,8 @@ public class DistributedQueueTest extends SolrTestCaseJ4 { assertFalse(sessionId == zkClient.getSolrZooKeeper().getSessionId()); } - protected ZkDistributedQueue makeDistributedQueue(String dqZNode) throws Exception { - return new ZkDistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode)); + protected DistributedQueue makeDistributedQueue(String dqZNode) throws Exception { + return new DistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode)); } private static class QueueChangerThread extends Thread { diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java index 69658a5a44b..749abdf804d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java @@ -26,7 +26,6 @@ import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest.METHOD; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; @@ -231,7 +230,8 @@ public class ForceLeaderTest extends HttpPartitionTest { } } - protected void setReplicaState(String collection, String slice, Replica replica, Replica.State state) throws Exception { + protected void setReplicaState(String collection, String slice, Replica replica, Replica.State state) throws SolrServerException, IOException, + KeeperException, InterruptedException { DistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()); ZkStateReader zkStateReader = cloudClient.getZkStateReader(); diff --git a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java index 6ab859327ca..537dbba86e2 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java @@ -23,7 +23,6 @@ import java.util.Random; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create; @@ -69,7 +68,7 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase { private void testFillWorkQueue() throws Exception { try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) { - DistributedQueue distributedQueue = new ZkDistributedQueue(cloudClient.getZkStateReader().getZkClient(), + DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(), "/overseer/collection-queue-work", new Overseer.Stats()); //fill the work queue with blocked tasks by adding more than the no:of parallel tasks for (int i = 0; i < MAX_PARALLEL_TASKS+5; i++) { @@ -150,7 +149,7 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase { private void testTaskExclusivity() throws Exception, SolrServerException { - DistributedQueue distributedQueue = new ZkDistributedQueue(cloudClient.getZkStateReader().getZkClient(), + DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(), "/overseer/collection-queue-work", new Overseer.Stats()); try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) { diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index 8e959748268..378e3710350 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -37,7 +37,6 @@ import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.SolrTestCaseJ4; -import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; @@ -134,7 +133,7 @@ public class OverseerTest extends SolrTestCaseJ4 { zkClient.close(); } - public void createCollection(String collection, int numShards) throws Exception { + public void createCollection(String collection, int numShards) throws KeeperException, InterruptedException { ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), "name", collection, @@ -147,7 +146,7 @@ public class OverseerTest extends SolrTestCaseJ4 { } public String publishState(String collection, String coreName, String coreNodeName, String shard, Replica.State stateName, int numShards) - throws Exception { + throws KeeperException, InterruptedException, IOException { if (stateName == null) { ElectionContext ec = electionContext.remove(coreName); if (ec != null) { diff --git a/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java b/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java index ea1f3e5de26..415f80f563b 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java @@ -31,7 +31,6 @@ import org.apache.lucene.util.TestUtil; import org.apache.solr.BaseDistributedSearchTestCase; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.SolrQuery; -import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java index a8f470ceac8..4a0495d6615 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java @@ -150,7 +150,7 @@ public class AutoAddReplicasPlanActionTest extends SolrCloudTestCase{ private List getOperations(JettySolrRunner actionJetty, String lostNodeName) { AutoAddReplicasPlanAction action = new AutoAddReplicasPlanAction(); TriggerEvent lostNode = new NodeLostTrigger.NodeLostEvent(TriggerEventType.NODELOST, ".auto_add_replicas", Collections.singletonList(System.currentTimeMillis()), Collections.singletonList(lostNodeName)); - ActionContext context = new ActionContext(actionJetty.getCoreContainer().getZkController().getClusterDataProvider(), null, new HashMap<>()); + ActionContext context = new ActionContext(actionJetty.getCoreContainer(), null, new HashMap<>()); action.process(lostNode, context); List operations = (List) context.getProperty("operations"); return operations; diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java index 23c880ae89a..1cb692e8369 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java @@ -110,7 +110,7 @@ public class ExecutePlanActionTest extends SolrCloudTestCase { NodeLostTrigger.NodeLostEvent nodeLostEvent = new NodeLostTrigger.NodeLostEvent(TriggerEventType.NODELOST, "mock_trigger_name", Collections.singletonList(TimeSource.CURRENT_TIME.getTime()), Collections.singletonList(sourceNodeName)); - ActionContext actionContext = new ActionContext(survivor.getCoreContainer().getZkController().getClusterDataProvider(), null, + ActionContext actionContext = new ActionContext(survivor.getCoreContainer(), null, new HashMap<>(Collections.singletonMap("operations", operations))); action.process(nodeLostEvent, actionContext); diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java index 8dd7340aed9..687aec76624 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java @@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.core.CoreContainer; -import org.apache.solr.core.SolrResourceLoader; import org.apache.solr.util.TimeSource; import org.junit.Before; import org.junit.BeforeClass; @@ -73,8 +72,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { long waitForSeconds = 1 + random().nextInt(5); Map props = createTriggerProps(waitForSeconds); - try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) { trigger.setProcessor(noFirstRunProcessor); trigger.run(); @@ -114,8 +112,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { // add a new node but remove it before the waitFor period expires // and assert that the trigger doesn't fire at all - try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) { final long waitTime = 2; props.put("waitFor", waitTime); trigger.setProcessor(noFirstRunProcessor); @@ -160,8 +157,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { action.put("name", "testActionInit"); action.put("class", NodeAddedTriggerTest.AssertInitTriggerAction.class.getName()); actions.add(action); - try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) { assertEquals(true, actionConstructorCalled.get()); assertEquals(false, actionInitCalled.get()); assertEquals(false, actionCloseCalled.get()); @@ -202,8 +198,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { public void testListenerAcceptance() throws Exception { CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer(); Map props = createTriggerProps(0); - try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) { trigger.setProcessor(noFirstRunProcessor); trigger.run(); // starts tracking live nodes @@ -239,8 +234,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { // add a new node but update the trigger before the waitFor period expires // and assert that the new trigger still fires - NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider()); + NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container); trigger.setProcessor(noFirstRunProcessor); trigger.run(); @@ -248,8 +242,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { trigger.run(); // this run should detect the new node trigger.close(); // close the old trigger - try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("some_different_name", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("some_different_name", props, container)) { try { newTrigger.restoreState(trigger); fail("Trigger should only be able to restore state from an old trigger of the same name"); @@ -258,8 +251,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase { } } - try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, container)) { AtomicBoolean fired = new AtomicBoolean(false); AtomicReference eventRef = new AtomicReference<>(); newTrigger.setProcessor(event -> { diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java index 0198098d0ee..9833c5d9452 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java @@ -73,8 +73,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { long waitForSeconds = 1 + random().nextInt(5); Map props = createTriggerProps(waitForSeconds); - try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) { trigger.setProcessor(noFirstRunProcessor); trigger.run(); String lostNodeName1 = cluster.getJettySolrRunner(1).getNodeName(); @@ -118,8 +117,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { // remove a node but add it back before the waitFor period expires // and assert that the trigger doesn't fire at all - try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) { final long waitTime = 2; props.put("waitFor", waitTime); trigger.setProcessor(noFirstRunProcessor); @@ -175,8 +173,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { action.put("name", "testActionInit"); action.put("class", AssertInitTriggerAction.class.getName()); actions.add(action); - try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container)) { assertEquals(true, actionConstructorCalled.get()); assertEquals(false, actionInitCalled.get()); assertEquals(false, actionCloseCalled.get()); @@ -217,8 +214,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { public void testListenerAcceptance() throws Exception { CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer(); Map props = createTriggerProps(0); - try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container)) { trigger.setProcessor(noFirstRunProcessor); JettySolrRunner newNode = cluster.startJettySolrRunner(); @@ -271,8 +267,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { // remove a node but update the trigger before the waitFor period expires // and assert that the new trigger still fires - NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider()); + NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container); trigger.setProcessor(noFirstRunProcessor); trigger.run(); @@ -289,8 +284,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { trigger.run(); // this run should detect the lost node trigger.close(); // close the old trigger - try (NodeLostTrigger newTrigger = new NodeLostTrigger("some_different_name", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeLostTrigger newTrigger = new NodeLostTrigger("some_different_name", props, container)) { try { newTrigger.restoreState(trigger); fail("Trigger should only be able to restore state from an old trigger of the same name"); @@ -299,8 +293,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase { } } - try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, container.getResourceLoader(), - container.getZkController().getClusterDataProvider())) { + try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, container)) { AtomicBoolean fired = new AtomicBoolean(false); AtomicReference eventRef = new AtomicReference<>(); newTrigger.setProcessor(event -> { diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java index 6bf1082d7b7..bd3d5fdb472 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java @@ -168,7 +168,7 @@ public class TestPolicyCloud extends SolrCloudTestCase { CollectionAdminRequest.createCollection("metricsTest", "conf", 1, 1) .process(cluster.getSolrClient()); DocCollection collection = getCollectionState("metricsTest"); - SolrClientDataProvider provider = new SolrClientDataProvider(new ZkDistributedQueueFactory(cluster.getZkClient()), solrClient); + SolrClientDataProvider provider = new SolrClientDataProvider(solrClient); List tags = Arrays.asList("metrics:solr.node:ADMIN./admin/authorization.clientErrors:count", "metrics:solr.jvm:buffers.direct.Count"); Map val = provider.getNodeValues(collection .getReplicas().get(0).getNodeName(), tags); @@ -268,7 +268,7 @@ public class TestPolicyCloud extends SolrCloudTestCase { CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "shard1", 2) .process(cluster.getSolrClient()); DocCollection rulesCollection = getCollectionState("policiesTest"); - SolrClientDataProvider provider = new SolrClientDataProvider(new ZkDistributedQueueFactory(cluster.getZkClient()), cluster.getSolrClient()); + SolrClientDataProvider provider = new SolrClientDataProvider(cluster.getSolrClient()); Map val = provider.getNodeValues(rulesCollection.getReplicas().get(0).getNodeName(), Arrays.asList( "freedisk", "cores", diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java index 24dafc81333..a0eab351af5 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java @@ -34,7 +34,6 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; -import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.client.solrj.embedded.JettySolrRunner; @@ -966,8 +965,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { public static class TestTriggerListener extends TriggerListenerBase { @Override - public void init(ClusterDataProvider clusterDataProvider, AutoScalingConfig.TriggerListenerConfig config) { - super.init(clusterDataProvider, config); + public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) { + super.init(coreContainer, config); listenerCreated.countDown(); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java deleted file mode 100644 index 44684f054fc..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.solr.client.solrj.cloud; - -import java.util.Collection; -import java.util.function.Predicate; - -import org.apache.solr.common.util.Pair; - -/** - * - */ -public interface DistributedQueue { - byte[] peek() throws Exception; - - byte[] peek(boolean block) throws Exception; - - byte[] peek(long wait) throws Exception; - - byte[] poll() throws Exception; - - byte[] remove() throws Exception; - - byte[] take() throws Exception; - - void offer(byte[] data) throws Exception; - - Collection> peekElements(int max, long waitMillis, Predicate acceptFilter) throws Exception; - -} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ClusterDataProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ClusterDataProvider.java index 8e77d62d5e4..58972afaa01 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ClusterDataProvider.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ClusterDataProvider.java @@ -19,28 +19,12 @@ package org.apache.solr.client.solrj.cloud.autoscaling; import java.io.Closeable; import java.io.IOException; -import java.net.ConnectException; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; -import org.apache.http.client.HttpClient; -import org.apache.solr.client.solrj.SolrRequest; -import org.apache.solr.client.solrj.SolrResponse; -import org.apache.solr.client.solrj.cloud.DistributedQueue; -import org.apache.solr.common.cloud.ClusterState; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; -import org.apache.zookeeper.Watcher; - -/** - * This interface abstracts the details of dealing with Zookeeper and Solr from the autoscaling framework. - */ public interface ClusterDataProvider extends Closeable { - /** - * Get the value of each tag for a given node + /**Get the value of each tag for a given node * * @param node node name * @param tags tag names @@ -50,87 +34,19 @@ public interface ClusterDataProvider extends Closeable { /** * Get the details of each replica in a node. It attempts to fetch as much details about - * the replica as mentioned in the keys list. It is not necessary to give all details + * the replica as mentioned in the keys list. It is not necessary to give al details *

* the format is {collection:shard :[{replicadetails}]} */ Map>> getReplicaInfo(String node, Collection keys); - /** - * Get the current set of live nodes. - */ - Collection getLiveNodes(); + Collection getNodes(); - ClusterState getClusterState() throws IOException; - - Map getClusterProperties(); - - default T getClusterProperty(String key, T defaultValue) { - T value = (T) getClusterProperties().get(key); - if (value == null) - return defaultValue; - return value; - } - - AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws ConnectException, InterruptedException, IOException; - - default AutoScalingConfig getAutoScalingConfig() throws ConnectException, InterruptedException, IOException { - return getAutoScalingConfig(null); - } - - /** - * Get the collection-specific policy + /**Get the collection-specific policy */ String getPolicyNameByCollection(String coll); @Override default void close() throws IOException { } - - // ZK-like methods - - boolean hasData(String path) throws IOException; - - List listData(String path) throws NoSuchElementException, IOException; - - class VersionedData { - public final int version; - public final byte[] data; - - public VersionedData(int version, byte[] data) { - this.version = version; - this.data = data; - } - } - - VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException; - - default VersionedData getData(String path) throws NoSuchElementException, IOException { - return getData(path, null); - } - - // mutators - - void makePath(String path) throws IOException; - - void createData(String path, byte[] data, CreateMode mode) throws IOException; - - void removeData(String path, int version) throws NoSuchElementException, IOException; - - void setData(String path, byte[] data, int version) throws NoSuchElementException, IOException; - - List multi(final Iterable ops) throws IOException; - - // Solr-like methods - - SolrResponse request(SolrRequest req) throws IOException; - - HttpClient getHttpClient(); - - interface DistributedQueueFactory { - DistributedQueue makeQueue(String path) throws IOException; - void removeQueue(String path) throws IOException; - } - - DistributedQueueFactory getDistributedQueueFactory(); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingClusterDataProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingClusterDataProvider.java deleted file mode 100644 index 54f4a80fa99..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingClusterDataProvider.java +++ /dev/null @@ -1,118 +0,0 @@ -package org.apache.solr.client.solrj.cloud.autoscaling; - -import java.io.IOException; -import java.net.ConnectException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; - -import org.apache.http.client.HttpClient; -import org.apache.solr.client.solrj.SolrRequest; -import org.apache.solr.client.solrj.SolrResponse; -import org.apache.solr.common.cloud.ClusterState; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; -import org.apache.zookeeper.Watcher; - -/** - * - */ -public class DelegatingClusterDataProvider implements ClusterDataProvider { - protected ClusterDataProvider delegate; - - public DelegatingClusterDataProvider(ClusterDataProvider delegate) { - this.delegate = delegate; - } - - @Override - public Map getNodeValues(String node, Collection tags) { - return delegate.getNodeValues(node, tags); - } - - @Override - public Map>> getReplicaInfo(String node, Collection keys) { - return delegate.getReplicaInfo(node, keys); - } - - @Override - public Collection getLiveNodes() { - return delegate.getLiveNodes(); - } - - @Override - public Map getClusterProperties() { - return delegate.getClusterProperties(); - } - - @Override - public ClusterState getClusterState() throws IOException { - return delegate.getClusterState(); - } - - @Override - public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws ConnectException, InterruptedException, IOException { - return delegate.getAutoScalingConfig(watcher); - } - - @Override - public String getPolicyNameByCollection(String coll) { - return delegate.getPolicyNameByCollection(coll); - } - - @Override - public boolean hasData(String path) throws IOException { - return delegate.hasData(path); - } - - @Override - public List listData(String path) throws NoSuchElementException, IOException { - return delegate.listData(path); - } - - @Override - public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException { - return delegate.getData(path, watcher); - } - - @Override - public void makePath(String path) throws IOException { - delegate.makePath(path); - } - - @Override - public void createData(String path, byte[] data, CreateMode mode) throws IOException { - delegate.createData(path, data, mode); - } - - @Override - public void removeData(String path, int version) throws NoSuchElementException, IOException { - delegate.removeData(path, version); - } - - @Override - public void setData(String path, byte[] data, int version) throws NoSuchElementException, IOException { - delegate.setData(path, data, version); - } - - @Override - public List multi(Iterable ops) throws IOException { - return delegate.multi(ops); - } - - @Override - public SolrResponse request(SolrRequest req) throws IOException { - return delegate.request(req); - } - - @Override - public HttpClient getHttpClient() { - return delegate.getHttpClient(); - } - - @Override - public DistributedQueueFactory getDistributedQueueFactory() { - return delegate.getDistributedQueueFactory(); - } -} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java index e1606859919..9c90e348dc6 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java @@ -201,7 +201,7 @@ public class Policy implements MapWriter { } Session(ClusterDataProvider dataProvider) { - this.nodes = new ArrayList<>(dataProvider.getLiveNodes()); + this.nodes = new ArrayList<>(dataProvider.getNodes()); this.dataProvider = dataProvider; for (String node : nodes) { collections.addAll(dataProvider.getReplicaInfo(node, Collections.emptyList()).keySet()); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java index da4c7de793f..a8c99ec97ec 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java @@ -49,7 +49,22 @@ public class PolicyHelper { List nodesList) { List positions = new ArrayList<>(); final ClusterDataProvider delegate = cdp; - cdp = new DelegatingClusterDataProvider(delegate) { + cdp = new ClusterDataProvider() { + @Override + public Map getNodeValues(String node, Collection tags) { + return delegate.getNodeValues(node, tags); + } + + @Override + public Map>> getReplicaInfo(String node, Collection keys) { + return delegate.getReplicaInfo(node, keys); + } + + @Override + public Collection getNodes() { + return delegate.getNodes(); + } + @Override public String getPolicyNameByCollection(String coll) { return policyMapping.get() != null && policyMapping.get().containsKey(coll) ? diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java index fcada43cb6b..fe1121d3565 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java @@ -46,7 +46,7 @@ public class Row implements MapWriter { if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>(); this.node = node; cells = new Cell[params.size()]; - isLive = dataProvider.getLiveNodes().contains(node); + isLive = dataProvider.getNodes().contains(node); Map vals = isLive ? dataProvider.getNodeValues(node, params) : Collections.emptyMap(); for (int i = 0; i < params.size(); i++) { String s = params.get(i); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientDataProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientDataProvider.java index a5ef4534821..d63f5f0c52c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientDataProvider.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientDataProvider.java @@ -28,15 +28,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Queue; import java.util.Set; -import org.apache.http.client.HttpClient; import org.apache.solr.client.solrj.SolrRequest; -import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider; import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; import org.apache.solr.client.solrj.request.GenericSolrRequest; @@ -45,7 +40,6 @@ import org.apache.solr.common.MapWriter; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; -import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.rule.ImplicitSnitch; import org.apache.solr.common.cloud.rule.RemoteCallback; @@ -56,12 +50,7 @@ import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.Utils; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,19 +63,16 @@ public class SolrClientDataProvider implements ClusterDataProvider, MapWriter { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final CloudSolrClient solrClient; - private final DistributedQueueFactory queueFactory; - private final ZkStateReader zkStateReader; - private final SolrZkClient zkClient; private final Map>>> data = new HashMap<>(); + private Set liveNodes; private Map snitchSession = new HashMap<>(); private Map nodeVsTags = new HashMap<>(); - public SolrClientDataProvider(DistributedQueueFactory queueFactory, CloudSolrClient solrClient) { - this.queueFactory = queueFactory; + public SolrClientDataProvider(CloudSolrClient solrClient) { this.solrClient = solrClient; - this.zkStateReader = solrClient.getZkStateReader(); - this.zkClient = zkStateReader.getZkClient(); + ZkStateReader zkStateReader = solrClient.getZkStateReader(); ClusterState clusterState = zkStateReader.getClusterState(); + this.liveNodes = clusterState.getLiveNodes(); Map all = clusterState.getCollectionStates(); all.forEach((collName, ref) -> { DocCollection coll = ref.get(); @@ -121,135 +107,16 @@ public class SolrClientDataProvider implements ClusterDataProvider, MapWriter { } @Override - public Collection getLiveNodes() { - return solrClient.getZkStateReader().getClusterState().getLiveNodes(); + public Collection getNodes() { + return liveNodes; } @Override public void writeMap(EntryWriter ew) throws IOException { - ew.put("liveNodes", zkStateReader.getClusterState().getLiveNodes()); + ew.put("liveNodes", liveNodes); ew.put("replicaInfo", Utils.getDeepCopy(data, 5)); ew.put("nodeValues", nodeVsTags); - } - @Override - public Map getClusterProperties() { - return zkStateReader.getClusterProperties(); - } - - @Override - public ClusterState getClusterState() { - return zkStateReader.getClusterState(); - } - - @Override - public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws IOException { - try { - return zkStateReader.getAutoScalingConfig(watcher); - } catch (KeeperException | InterruptedException e) { - throw new IOException(e); - } - } - - @Override - public boolean hasData(String path) throws IOException { - try { - return zkClient.exists(path, true); - } catch (KeeperException | InterruptedException e) { - throw new IOException(e); - } - } - - @Override - public List listData(String path) throws NoSuchElementException, IOException { - try { - return zkClient.getChildren(path, null, true); - } catch (KeeperException.NoNodeException e) { - throw new NoSuchElementException(path); - } catch (KeeperException | InterruptedException e) { - throw new IOException(e); - } - } - - @Override - public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException { - Stat stat = new Stat(); - try { - byte[] bytes = zkClient.getData(path, watcher, stat, true); - return new VersionedData(stat.getVersion(), bytes); - } catch (KeeperException.NoNodeException e) { - throw new NoSuchElementException(path); - } catch (KeeperException | InterruptedException e) { - throw new IOException(e); - } - } - - @Override - public void makePath(String path) throws IOException { - try { - zkClient.makePath(path, true); - } catch (KeeperException | InterruptedException e) { - throw new IOException(e); - } - } - - @Override - public void createData(String path, byte[] data, CreateMode mode) throws IOException { - try { - zkClient.create(path, data, mode, true); - } catch (KeeperException | InterruptedException e) { - throw new IOException(e); - } - } - - @Override - public void removeData(String path, int version) throws NoSuchElementException, IOException { - try { - zkClient.delete(path, version, true); - } catch (KeeperException.NoNodeException e) { - throw new NoSuchElementException(path); - } catch (KeeperException | InterruptedException e) { - throw new IOException(e); - } - } - - @Override - public void setData(String path, byte[] data, int version) throws NoSuchElementException, IOException { - try { - zkClient.setData(path, data, version, true); - } catch (KeeperException.NoNodeException e) { - throw new NoSuchElementException(path); - } catch (KeeperException | InterruptedException e) { - throw new IOException(e); - } - } - - @Override - public SolrResponse request(SolrRequest req) throws IOException { - try { - return req.process(solrClient); - } catch (SolrServerException e) { - throw new IOException(e); - } - } - - @Override - public List multi(Iterable ops) throws IOException { - try { - return zkClient.multi(ops, true); - } catch (Exception e) { - throw new IOException(e); - } - } - - @Override - public HttpClient getHttpClient() { - return solrClient.getHttpClient(); - } - - @Override - public DistributedQueueFactory getDistributedQueueFactory() { - return queueFactory; } static class ClientSnitchCtx diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java index 4d7579598c6..a11eab4a940 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import com.google.common.collect.ImmutableList; import org.apache.solr.SolrTestCaseJ4; @@ -421,7 +422,7 @@ public class TestPolicy extends SolrTestCaseJ4 { }); }); - return new DelegatingClusterDataProvider(null) { + return new ClusterDataProvider(){ @Override public Map getNodeValues(String node, Collection tags) { return (Map) Utils.getObjectByPath(m,false, Arrays.asList("nodeValues", node)); @@ -433,7 +434,7 @@ public class TestPolicy extends SolrTestCaseJ4 { } @Override - public Collection getLiveNodes() { + public Collection getNodes() { return (Collection) m.get("liveNodes"); } @@ -962,7 +963,7 @@ public class TestPolicy extends SolrTestCaseJ4 { Policy policy = new Policy((Map) Utils.fromJSONString(autoscaleJson)); ClusterDataProvider clusterDataProvider = getClusterDataProvider(nodeValues, clusterState); - ClusterDataProvider cdp = new DelegatingClusterDataProvider(null) { + ClusterDataProvider cdp = new ClusterDataProvider() { @Override public Map getNodeValues(String node, Collection tags) { return clusterDataProvider.getNodeValues(node, tags); @@ -974,8 +975,8 @@ public class TestPolicy extends SolrTestCaseJ4 { } @Override - public Collection getLiveNodes() { - return clusterDataProvider.getLiveNodes(); + public Collection getNodes() { + return clusterDataProvider.getNodes(); } @Override @@ -1040,7 +1041,7 @@ public class TestPolicy extends SolrTestCaseJ4 { " 'freedisk':918005641216}}}"); Policy policy = new Policy((Map) Utils.fromJSONString(autoscaleJson)); - Policy.Session session = policy.createSession(new DelegatingClusterDataProvider(null) { + Policy.Session session = policy.createSession(new ClusterDataProvider() { @Override public Map getNodeValues(String node, Collection tags) { return tagsMap.get(node); @@ -1052,7 +1053,7 @@ public class TestPolicy extends SolrTestCaseJ4 { } @Override - public Collection getLiveNodes() { + public Collection getNodes() { return replicaInfoMap.keySet(); } @@ -1098,7 +1099,7 @@ public class TestPolicy extends SolrTestCaseJ4 { "}"); Policy policy = new Policy((Map) Utils.fromJSONString(rules)); ClusterDataProvider clusterDataProvider = getClusterDataProvider(nodeValues, clusterState); - ClusterDataProvider cdp = new DelegatingClusterDataProvider(null) { + ClusterDataProvider cdp = new ClusterDataProvider() { @Override public Map getNodeValues(String node, Collection tags) { return clusterDataProvider.getNodeValues(node, tags); @@ -1110,8 +1111,8 @@ public class TestPolicy extends SolrTestCaseJ4 { } @Override - public Collection getLiveNodes() { - return clusterDataProvider.getLiveNodes(); + public Collection getNodes() { + return clusterDataProvider.getNodes(); } @Override @@ -1130,7 +1131,7 @@ public class TestPolicy extends SolrTestCaseJ4 { } private ClusterDataProvider getClusterDataProvider(final Map nodeValues, String clusterState) { - return new DelegatingClusterDataProvider(null) { + return new ClusterDataProvider() { @Override public Map getNodeValues(String node, Collection tags) { Map result = new LinkedHashMap<>(); @@ -1139,7 +1140,7 @@ public class TestPolicy extends SolrTestCaseJ4 { } @Override - public Collection getLiveNodes() { + public Collection getNodes() { return nodeValues.keySet(); } @@ -1167,7 +1168,7 @@ public class TestPolicy extends SolrTestCaseJ4 { " '127.0.0.1:50096_solr':{" + " 'cores':0," + " 'port':'50096'}}"); - ClusterDataProvider dataProvider = new DelegatingClusterDataProvider(null) { + ClusterDataProvider dataProvider = new ClusterDataProvider() { @Override public Map getNodeValues(String node, Collection keys) { Map result = new LinkedHashMap<>(); @@ -1186,7 +1187,7 @@ public class TestPolicy extends SolrTestCaseJ4 { } @Override - public Collection getLiveNodes() { + public Collection getNodes() { return Arrays.asList( "127.0.0.1:50097_solr", "127.0.0.1:50096_solr"); } }; @@ -1224,7 +1225,7 @@ public class TestPolicy extends SolrTestCaseJ4 { "node4:{cores:0, freedisk: 900, heap:16900, nodeRole:overseer, sysprop.rack:rack2}" + "}"); - ClusterDataProvider dataProvider = new DelegatingClusterDataProvider(null) { + ClusterDataProvider dataProvider = new ClusterDataProvider() { @Override public Map getNodeValues(String node, Collection keys) { Map result = new LinkedHashMap<>(); @@ -1243,7 +1244,7 @@ public class TestPolicy extends SolrTestCaseJ4 { } @Override - public Collection getLiveNodes() { + public Collection getNodes() { return Arrays.asList("node1", "node2", "node3", "node4"); } };