SOLR-11285: Initial refactoring.

This commit is contained in:
Andrzej Bialecki 2017-08-24 15:09:37 +02:00
parent 1a1286b54b
commit aee54ff7d1
54 changed files with 976 additions and 553 deletions

View File

@ -18,6 +18,7 @@
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Collections;
@ -71,7 +72,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
}
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
throws KeeperException, InterruptedException {
throws IOException, InterruptedException {
log.debug("addReplica() : {}", Utils.toJSONString(message));
String collection = message.getStr(COLLECTION_PROP);
String node = message.getStr(CoreAdminParams.NODE);
@ -120,7 +121,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
}
} else {
node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
ocmh.overseer.getZkController().getCoreContainer()).get(0).nodeName;// TODO: use replica type in this logic too
ocmh.overseer.getClusterDataProvider(), ocmh.overseer.getCoreContainer()).get(0).nodeName;// TODO: use replica type in this logic too
}
}
log.info("Node Identified {} for creating new replica", node);
@ -159,7 +160,11 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
if (coreNodeName != null) {
props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
}
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
try {
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
}
}
params.set(CoreAdminParams.CORE_NODE_NAME,
ocmh.waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());

View File

@ -31,6 +31,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
@ -243,10 +244,10 @@ public class Assign {
List<String> shardNames,
int numNrtReplicas,
int numTlogReplicas,
int numPullReplicas) throws KeeperException, InterruptedException {
int numPullReplicas) throws IOException, InterruptedException {
List<Map> rulesMap = (List) message.get("rule");
String policyName = message.getStr(POLICY);
AutoScalingConfig autoScalingConfig = ocmh.zkStateReader.getAutoScalingConfig();
AutoScalingConfig autoScalingConfig = ocmh.overseer.getClusterDataProvider().getAutoScalingConfig();
if (rulesMap == null && policyName == null && autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
log.debug("Identify nodes using default");
@ -295,7 +296,7 @@ public class Assign {
PolicyHelper.SESSION_REF.set(ocmh.policySessionRef);
try {
return getPositionsUsingPolicy(collectionName,
shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.zkStateReader, nodeList);
shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.overseer.getClusterDataProvider(), nodeList);
} finally {
PolicyHelper.SESSION_REF.remove();
}
@ -324,7 +325,7 @@ public class Assign {
// could be created on live nodes given maxShardsPerNode, Replication factor (if from createShard) etc.
public static List<ReplicaCount> getNodesForNewReplicas(ClusterState clusterState, String collectionName,
String shard, int nrtReplicas,
Object createNodeSet, CoreContainer cc) throws KeeperException, InterruptedException {
Object createNodeSet, ClusterDataProvider cdp, CoreContainer cc) throws IOException, InterruptedException {
log.debug("getNodesForNewReplicas() shard: {} , replicas : {} , createNodeSet {}", shard, nrtReplicas, createNodeSet );
DocCollection coll = clusterState.getCollection(collectionName);
Integer maxShardsPerNode = coll.getMaxShardsPerNode();
@ -356,13 +357,14 @@ public class Assign {
List l = (List) coll.get(DocCollection.RULE);
List<ReplicaPosition> replicaPositions = null;
if (l != null) {
// TODO nocommit: make it so that this method doesn't require access to CC
replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cc, coll, createNodeList, l);
}
String policyName = coll.getStr(POLICY);
AutoScalingConfig autoScalingConfig = cc.getZkController().zkStateReader.getAutoScalingConfig();
AutoScalingConfig autoScalingConfig = cdp.getAutoScalingConfig();
if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, 0, 0,
policyName, cc.getZkController().zkStateReader, createNodeList);
policyName, cdp, createNodeList);
}
if(replicaPositions != null){
@ -383,21 +385,18 @@ public class Assign {
int nrtReplicas,
int tlogReplicas,
int pullReplicas,
String policyName, ZkStateReader zkStateReader,
List<String> nodesList) throws KeeperException, InterruptedException {
String policyName, ClusterDataProvider cdp,
List<String> nodesList) throws IOException, InterruptedException {
log.debug("shardnames {} NRT {} TLOG {} PULL {} , policy {}, nodeList {}", shardNames, nrtReplicas, tlogReplicas, pullReplicas, policyName, nodesList);
SolrClientDataProvider clientDataProvider = null;
List<ReplicaPosition> replicaPositions = null;
AutoScalingConfig autoScalingConfig = zkStateReader.getAutoScalingConfig();
try (CloudSolrClient csc = new CloudSolrClient.Builder()
.withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader))
.build()) {
clientDataProvider = new SolrClientDataProvider(csc);
AutoScalingConfig autoScalingConfig = cdp.getAutoScalingConfig();
try {
Map<String, String> kvMap = Collections.singletonMap(collName, policyName);
replicaPositions = PolicyHelper.getReplicaLocations(
collName,
autoScalingConfig,
clientDataProvider,
cdp,
kvMap,
shardNames,
nrtReplicas,
@ -405,7 +404,7 @@ public class Assign {
pullReplicas,
nodesList);
return replicaPositions;
} catch (IOException e) {
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error closing CloudSolrClient",e);
} finally {
if (log.isTraceEnabled()) {

View File

@ -17,6 +17,7 @@
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
@ -27,6 +28,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
@ -103,7 +105,7 @@ public class CreateShardCmd implements Cmd {
numPullReplicas);
} else {
List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, totalReplicas,
createNodeSetStr, ocmh.overseer.getZkController().getCoreContainer());
createNodeSetStr, ocmh.overseer.getClusterDataProvider(), ocmh.overseer.getCoreContainer());
int i = 0;
positions = new ArrayList<>();
for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
@ -173,8 +175,8 @@ public class CreateShardCmd implements Cmd {
}
static boolean usePolicyFramework(DocCollection collection, OverseerCollectionMessageHandler ocmh)
throws KeeperException, InterruptedException {
Map autoScalingJson = Utils.getJson(ocmh.zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
return autoScalingJson.get(Policy.CLUSTER_POLICY) != null || collection.getPolicyName() != null;
throws IOException, InterruptedException {
AutoScalingConfig autoScalingConfig = ocmh.overseer.getClusterDataProvider().getAutoScalingConfig();
return !autoScalingConfig.isEmpty() || collection.getPolicyName() != null;
}
}

View File

@ -26,6 +26,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;

View File

@ -223,7 +223,11 @@ class ShardLeaderElectionContextBase extends ElectionContext {
ZkStateReader.CORE_NAME_PROP,
leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m));
try {
Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m));
} catch (Exception e) {
throw new IOException("Overseer state update queue error", e);
}
}
public LeaderElector getLeaderElector() {
@ -312,7 +316,11 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
// clear the leader in clusterstate
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m));
try {
Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m));
} catch (Exception e) {
throw new IOException("Overseer state update queue error", e);
}
boolean allReplicasInLine = false;
if (!weAreReplacement) {
@ -494,7 +502,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
}
public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws KeeperException, InterruptedException {
public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception {
if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) {
ZkStateReader zkStateReader = zkController.getZkStateReader();
zkStateReader.forceUpdateCollection(collection);

View File

@ -30,9 +30,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.codahale.metrics.Timer;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
import org.apache.solr.cloud.autoscaling.AutoScaling;
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
import org.apache.solr.cloud.autoscaling.ZkDistributedQueueFactory;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.CollectionMutator;
import org.apache.solr.cloud.overseer.NodeMutator;
@ -53,6 +59,7 @@ import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.request.LocalSolrQueryRequest;
@ -88,10 +95,10 @@ public class Overseer implements Closeable {
private final SolrZkClient zkClient;
private final String myId;
//queue where everybody can throw tasks
private final DistributedQueue stateUpdateQueue;
private final ZkDistributedQueue stateUpdateQueue;
//Internal queue where overseer stores events that have not yet been published into cloudstate
//If Overseer dies while extracting the main queue a new overseer will start from this queue
private final DistributedQueue workQueue;
private final ZkDistributedQueue workQueue;
// Internal map which holds the information about running tasks.
private final DistributedMap runningMap;
// Internal map which holds the information about successfully completed tasks.
@ -538,7 +545,8 @@ public class Overseer implements Closeable {
autoscalingTriggerCreator.start();
ThreadGroup triggerThreadGroup = new ThreadGroup("Overseer autoscaling triggers");
OverseerTriggerThread trigger = new OverseerTriggerThread(zkController);
OverseerTriggerThread trigger = new OverseerTriggerThread(zkController.getCoreContainer().getResourceLoader(),
zkController.getClusterDataProvider());
triggerThread = new OverseerThread(triggerThreadGroup, trigger, "OverseerAutoScalingTriggerThread-" + id);
updaterThread.start();
@ -554,7 +562,15 @@ public class Overseer implements Closeable {
ZkController getZkController(){
return zkController;
}
public CoreContainer getCoreContainer() {
return zkController.getCoreContainer();
}
public ClusterDataProvider getClusterDataProvider() {
return zkController.getClusterDataProvider();
}
/**
* For tests.
*
@ -679,7 +695,7 @@ public class Overseer implements Closeable {
* This method will create the /overseer znode in ZooKeeper if it does not exist already.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
public static DistributedQueue getStateUpdateQueue(final SolrZkClient zkClient) {
return getStateUpdateQueue(zkClient, new Stats());
@ -692,11 +708,11 @@ public class Overseer implements Closeable {
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @param zkStats a {@link Overseer.Stats} object which tracks statistics for all zookeeper operations performed by this queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static DistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) {
static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
return new DistributedQueue(zkClient, "/overseer/queue", zkStats);
return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats);
}
/**
@ -712,11 +728,11 @@ public class Overseer implements Closeable {
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @param zkStats a {@link Overseer.Stats} object which tracks statistics for all zookeeper operations performed by this queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static DistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) {
static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
return new DistributedQueue(zkClient, "/overseer/queue-work", zkStats);
return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats);
}
/* Internal map for failed tasks, not to be used outside of the Overseer */
@ -750,7 +766,7 @@ public class Overseer implements Closeable {
* see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) {
return getCollectionQueue(zkClient, new Stats());
@ -768,7 +784,7 @@ public class Overseer implements Closeable {
* see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
@ -788,7 +804,7 @@ public class Overseer implements Closeable {
* see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient) {
return getConfigSetQueue(zkClient, new Stats());
@ -811,7 +827,7 @@ public class Overseer implements Closeable {
* {@link OverseerConfigSetMessageHandler}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link DistributedQueue} object
* @return a {@link ZkDistributedQueue} object
*/
static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) {
// For now, we use the same queue as the collection queue, but ensure

View File

@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
@ -321,7 +322,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
throws KeeperException, InterruptedException {
throws Exception {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
SolrZkClient zkClient = zkStateReader.getZkClient();
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
@ -332,7 +333,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
inQueue.offer(Utils.toJSON(m));
}
private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) || StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP +
@ -441,7 +442,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
}
void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException {
void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception {
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
ZkStateReader.CORE_NAME_PROP, core,
@ -462,8 +463,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
//TODO should we not remove in the next release ?
private void migrateStateFormat(ClusterState state, ZkNodeProps message, NamedList results)
throws KeeperException, InterruptedException {
private void migrateStateFormat(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
final String collectionName = message.getStr(COLLECTION_PROP);
boolean firstLoop = true;
@ -634,7 +634,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
private void modifyCollection(ClusterState clusterState, ZkNodeProps message, NamedList results)
throws KeeperException, InterruptedException {
throws Exception {
final String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
//the rest of the processing is based on writing cluster state properties
@ -712,7 +712,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
throws KeeperException, InterruptedException {
throws Exception {
return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete);
}

View File

@ -56,7 +56,7 @@ public class OverseerNodePrioritizer {
this.shardHandlerFactory = shardHandlerFactory;
}
public synchronized void prioritizeOverseerNodes(String overseerId) throws KeeperException, InterruptedException {
public synchronized void prioritizeOverseerNodes(String overseerId) throws Exception {
SolrZkClient zk = zkStateReader.getZkClient();
if(!zk.exists(ZkStateReader.ROLES,true))return;
Map m = (Map) Utils.fromJSON(zk.getData(ZkStateReader.ROLES, null, new Stat(), true));

View File

@ -35,11 +35,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link DistributedQueue} augmented with helper methods specific to the overseer task queues.
* A {@link ZkDistributedQueue} augmented with helper methods specific to the overseer task queues.
* Methods specific to this subclass ignore superclass internal state and hit ZK directly.
* This is inefficient! But the API on this class is kind of muddy..
*/
public class OverseerTaskQueue extends DistributedQueue {
public class OverseerTaskQueue extends ZkDistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String RESPONSE_PREFIX = "qnr-" ;

View File

@ -174,7 +174,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
final private void recoveryFailed(final SolrCore core,
final ZkController zkController, final String baseUrl,
final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
final String shardZkNodeName, final CoreDescriptor cd) throws Exception {
SolrException.log(LOG, "Recovery failed - I give up.");
try {
zkController.publish(cd, Replica.State.RECOVERY_FAILED);
@ -297,7 +297,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
}
final public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
final public void doRecovery(SolrCore core) throws Exception {
if (core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog()) {
doSyncOrReplicateRecovery(core);
} else {
@ -440,7 +440,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
// TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
final public void doSyncOrReplicateRecovery(SolrCore core) throws KeeperException, InterruptedException {
final public void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
boolean replayed = false;
boolean successfulRecovery = false;

View File

@ -32,8 +32,8 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.cloud.ReplicaPosition;

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;

View File

@ -51,10 +51,16 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.cloud.autoscaling.ZkDistributedQueueFactory;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.SolrException;
@ -191,6 +197,8 @@ public class ZkController {
private final SolrZkClient zkClient;
private final ZkCmdExecutor cmdExecutor;
public final ZkStateReader zkStateReader;
private ClusterDataProvider clusterDataProvider;
private CloudSolrClient cloudSolrClient;
private final String zkServerAddress; // example: 127.0.0.1:54062/solr
@ -435,7 +443,7 @@ public class ZkController {
});
init(registerOnReconnect);
assert ObjectReleaseTracker.track(this);
}
@ -554,6 +562,12 @@ public class ZkController {
IOUtils.closeQuietly(overseerElector.getContext());
IOUtils.closeQuietly(overseer);
} finally {
if (cloudSolrClient != null) {
IOUtils.closeQuietly(cloudSolrClient);
}
if (clusterDataProvider != null) {
IOUtils.closeQuietly(clusterDataProvider);
}
try {
try {
zkStateReader.close();
@ -588,6 +602,22 @@ public class ZkController {
return zkStateReader.getClusterState();
}
public ClusterDataProvider getClusterDataProvider() {
if (clusterDataProvider != null) {
return clusterDataProvider;
}
synchronized(this) {
if (clusterDataProvider != null) {
return clusterDataProvider;
}
cloudSolrClient = new CloudSolrClient.Builder()
.withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader))
.build();
clusterDataProvider = new SolrClientDataProvider(new ZkDistributedQueueFactory(zkClient), cloudSolrClient);
}
return clusterDataProvider;
}
/**
* Returns config file data (in bytes)
*/
@ -1290,18 +1320,18 @@ public class ZkController {
return baseURL;
}
public void publish(final CoreDescriptor cd, final Replica.State state) throws KeeperException, InterruptedException {
public void publish(final CoreDescriptor cd, final Replica.State state) throws Exception {
publish(cd, state, true);
}
public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws KeeperException, InterruptedException {
public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws Exception {
publish(cd, state, updateLastState, false);
}
/**
* Publish core state to overseer.
*/
public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState, boolean forcePublish) throws KeeperException, InterruptedException {
public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState, boolean forcePublish) throws Exception {
if (!forcePublish) {
try (SolrCore core = cc.getCore(cd.getName())) {
if (core == null || core.isClosed()) {
@ -1410,7 +1440,7 @@ public class ZkController {
return true;
}
public void unregister(String coreName, CoreDescriptor cd) throws InterruptedException, KeeperException {
public void unregister(String coreName, CoreDescriptor cd) throws Exception {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
final String collection = cd.getCloudDescriptor().getCollectionName();
@ -1441,8 +1471,7 @@ public class ZkController {
overseerJobQueue.offer(Utils.toJSON(m));
}
public void createCollection(String collection) throws KeeperException,
InterruptedException {
public void createCollection(String collection) throws Exception {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
CollectionParams.CollectionAction.CREATE.toLower(), ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.COLLECTION_PROP, collection);
@ -1567,6 +1596,9 @@ public class ZkController {
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (Exception e) {
log.error("", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
if (cd.getCloudDescriptor().getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getClusterState(), coreNodeName)) {

View File

@ -30,6 +30,7 @@ import java.util.function.Predicate;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
@ -43,11 +44,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A distributed queue. Optimized for single-consumer,
* A ZK-based distributed queue. Optimized for single-consumer,
* multiple-producer: if there are multiple consumers on the same ZK queue,
* the results should be correct but inefficient
*/
public class DistributedQueue {
public class ZkDistributedQueue implements DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final String PREFIX = "qn-";
@ -92,11 +93,11 @@ public class DistributedQueue {
private int watcherCount = 0;
public DistributedQueue(SolrZkClient zookeeper, String dir) {
public ZkDistributedQueue(SolrZkClient zookeeper, String dir) {
this(zookeeper, dir, new Overseer.Stats());
}
public DistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) {
public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) {
this.dir = dir;
ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
@ -119,6 +120,7 @@ public class DistributedQueue {
*
* @return data at the first element of the queue, or null.
*/
@Override
public byte[] peek() throws KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_peek");
try {
@ -135,6 +137,7 @@ public class DistributedQueue {
* @param block if true, blocks until an element enters the queue
* @return data at the first element of the queue, or null.
*/
@Override
public byte[] peek(boolean block) throws KeeperException, InterruptedException {
return block ? peek(Long.MAX_VALUE) : peek();
}
@ -146,6 +149,7 @@ public class DistributedQueue {
* @param wait max wait time in ms.
* @return data at the first element of the queue, or null.
*/
@Override
public byte[] peek(long wait) throws KeeperException, InterruptedException {
Preconditions.checkArgument(wait > 0);
Timer.Context time;
@ -177,6 +181,7 @@ public class DistributedQueue {
*
* @return Head of the queue or null.
*/
@Override
public byte[] poll() throws KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_poll");
try {
@ -191,6 +196,7 @@ public class DistributedQueue {
*
* @return The former head of the queue
*/
@Override
public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_remove");
try {
@ -209,6 +215,7 @@ public class DistributedQueue {
*
* @return The former head of the queue
*/
@Override
public byte[] take() throws KeeperException, InterruptedException {
// Same as for element. Should refactor this.
Timer.Context timer = stats.time(dir + "_take");
@ -231,6 +238,7 @@ public class DistributedQueue {
* Inserts data into queue. If there are no other queue consumers, the offered element
* will be immediately visible when this method returns.
*/
@Override
public void offer(byte[] data) throws KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_offer");
try {
@ -326,7 +334,8 @@ public class DistributedQueue {
* <p/>
* Package-private to support {@link OverseerTaskQueue} specifically.
*/
Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
@Override
public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
List<String> foundChildren = new ArrayList<>();
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
boolean first = true;

View File

@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.Map;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.common.MapWriter;
import org.apache.solr.core.CoreContainer;
@ -30,18 +31,18 @@ import org.apache.solr.core.CoreContainer;
*/
public class ActionContext implements MapWriter {
private final CoreContainer coreContainer;
private final ClusterDataProvider clusterDataProvider;
private final AutoScaling.Trigger source;
private final Map<String, Object> properties;
public ActionContext(CoreContainer coreContainer, AutoScaling.Trigger source, Map<String, Object> properties) {
this.coreContainer = coreContainer;
public ActionContext(ClusterDataProvider clusterDataProvider, AutoScaling.Trigger source, Map<String, Object> properties) {
this.clusterDataProvider = clusterDataProvider;
this.source = source;
this.properties = properties;
}
public CoreContainer getCoreContainer() {
return coreContainer;
public ClusterDataProvider getClusterDataProvider() {
return clusterDataProvider;
}
public AutoScaling.Trigger getSource() {

View File

@ -18,8 +18,12 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
@ -27,15 +31,20 @@ import org.apache.solr.common.cloud.ZkStateReader;
public class AutoAddReplicasPlanAction extends ComputePlanAction {
@Override
protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, ZkStateReader zkStateReader) {
protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, ClusterDataProvider cdp) {
// for backward compatibility
String autoAddReplicas = zkStateReader.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
String autoAddReplicas = cdp.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
return new NoneSuggester();
}
Policy.Suggester suggester = super.getSuggester(session, event, zkStateReader);
ClusterState clusterState = zkStateReader.getClusterState();
Policy.Suggester suggester = super.getSuggester(session, event, cdp);
ClusterState clusterState;
try {
clusterState = cdp.getClusterState();
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception getting cluster state", e);
}
boolean anyCollections = false;
for (DocCollection collection: clusterState.getCollectionsMap().values()) {

View File

@ -24,8 +24,10 @@ import java.util.Map;
import com.google.common.base.Preconditions;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrResourceLoader;
public class AutoScaling {
@ -110,30 +112,13 @@ public class AutoScaling {
void init();
}
public static class TriggerFactory implements Closeable {
/**
* Factory to produce instances of {@link Trigger}.
*/
public static abstract class TriggerFactory implements Closeable {
protected boolean isClosed = false;
private final CoreContainer coreContainer;
private boolean isClosed = false;
public TriggerFactory(CoreContainer coreContainer) {
Preconditions.checkNotNull(coreContainer);
this.coreContainer = coreContainer;
}
public synchronized Trigger create(TriggerEventType type, String name, Map<String, Object> props) {
if (isClosed) {
throw new AlreadyClosedException("TriggerFactory has already been closed, cannot create new triggers");
}
switch (type) {
case NODEADDED:
return new NodeAddedTrigger(name, props, coreContainer);
case NODELOST:
return new NodeLostTrigger(name, props, coreContainer);
default:
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
}
}
public abstract Trigger create(TriggerEventType type, String name, Map<String, Object> props);
@Override
public void close() throws IOException {
@ -143,6 +128,38 @@ public class AutoScaling {
}
}
/**
* Default implementation of {@link TriggerFactory}.
*/
public static class TriggerFactoryImpl extends TriggerFactory {
private final ClusterDataProvider clusterDataProvider;
private final SolrResourceLoader loader;
public TriggerFactoryImpl(SolrResourceLoader loader, ClusterDataProvider clusterDataProvider) {
Preconditions.checkNotNull(clusterDataProvider);
Preconditions.checkNotNull(loader);
this.clusterDataProvider = clusterDataProvider;
this.loader = loader;
}
@Override
public synchronized Trigger create(TriggerEventType type, String name, Map<String, Object> props) {
if (isClosed) {
throw new AlreadyClosedException("TriggerFactory has already been closed, cannot create new triggers");
}
switch (type) {
case NODEADDED:
return new NodeAddedTrigger(name, props, loader, clusterDataProvider);
case NODELOST:
return new NodeLostTrigger(name, props, loader, clusterDataProvider);
default:
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
}
}
}
public static final String AUTO_ADD_REPLICAS_TRIGGER_DSL =
"{" +
" 'set-trigger' : {" +

View File

@ -214,7 +214,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
try (CloudSolrClient build = new CloudSolrClient.Builder()
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.withZkHost(container.getZkController().getZkServerAddress()).build()) {
Policy.Session session = policy.createSession(new SolrClientDataProvider(build));
Policy.Session session = policy.createSession(new SolrClientDataProvider(new ZkDistributedQueueFactory(container.getZkController().getZkClient()), build));
List<Row> sorted = session.getSorted();
List<Clause.Violation> violations = session.getViolations();
@ -638,7 +638,8 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
try (CloudSolrClient build = new CloudSolrClient.Builder()
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.withZkHost(container.getZkController().getZkServerAddress()).build()) {
Policy.Session session = autoScalingConf.getPolicy().createSession(new SolrClientDataProvider(build));
Policy.Session session = autoScalingConf.getPolicy()
.createSession(new SolrClientDataProvider(new ZkDistributedQueueFactory(container.getZkController().getZkClient()), build));
log.debug("Verified autoscaling configuration");
}
}

View File

@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
@ -48,40 +49,30 @@ public class ComputePlanAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) {
log.debug("-- processing event: {} with context properties: {}", event, context.getProperties());
CoreContainer container = context.getCoreContainer();
ClusterDataProvider cdp = context.getClusterDataProvider();
try {
try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder()
.withZkHost(container.getZkController().getZkServerAddress())
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.build()) {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
AutoScalingConfig autoScalingConf = zkStateReader.getAutoScalingConfig();
if (autoScalingConf.isEmpty()) {
log.error("Action: " + getName() + " executed but no policy is configured");
return;
}
Policy policy = autoScalingConf.getPolicy();
Policy.Session session = policy.createSession(new SolrClientDataProvider(cloudSolrClient));
Policy.Suggester suggester = getSuggester(session, event, zkStateReader);
while (true) {
SolrRequest operation = suggester.getOperation();
if (operation == null) break;
log.info("Computed Plan: {}", operation.getParams());
Map<String, Object> props = context.getProperties();
props.compute("operations", (k, v) -> {
List<SolrRequest> operations = (List<SolrRequest>) v;
if (operations == null) operations = new ArrayList<>();
operations.add(operation);
return operations;
});
session = suggester.getSession();
suggester = getSuggester(session, event, zkStateReader);
}
AutoScalingConfig autoScalingConf = cdp.getAutoScalingConfig();
if (autoScalingConf.isEmpty()) {
log.error("Action: " + getName() + " executed but no policy is configured");
return;
}
Policy policy = autoScalingConf.getPolicy();
Policy.Session session = policy.createSession(cdp);
Policy.Suggester suggester = getSuggester(session, event, cdp);
while (true) {
SolrRequest operation = suggester.getOperation();
if (operation == null) break;
log.info("Computed Plan: {}", operation.getParams());
Map<String, Object> props = context.getProperties();
props.compute("operations", (k, v) -> {
List<SolrRequest> operations = (List<SolrRequest>) v;
if (operations == null) operations = new ArrayList<>();
operations.add(operation);
return operations;
});
session = suggester.getSession();
suggester = getSuggester(session, event, cdp);
}
} catch (KeeperException e) {
log.error("ZooKeeperException while processing event: " + event, e);
} catch (InterruptedException e) {
log.error("Interrupted while processing event: " + event, e);
} catch (IOException e) {
log.error("IOException while processing event: " + event, e);
} catch (Exception e) {
@ -89,7 +80,7 @@ public class ComputePlanAction extends TriggerActionBase {
}
}
protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, ZkStateReader zkStateReader) {
protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, ClusterDataProvider cdp) {
Policy.Suggester suggester;
switch (event.getEventType()) {
case NODEADDED:

View File

@ -25,6 +25,7 @@ import java.util.List;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.common.SolrException;
@ -43,32 +44,29 @@ public class ExecutePlanAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) {
log.debug("-- processing event: {} with context properties: {}", event, context.getProperties());
CoreContainer container = context.getCoreContainer();
ClusterDataProvider clusterDataProvider = context.getClusterDataProvider();
List<SolrRequest> operations = (List<SolrRequest>) context.getProperty("operations");
if (operations == null || operations.isEmpty()) {
log.info("No operations to execute for event: {}", event);
return;
}
try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder()
.withZkHost(container.getZkController().getZkServerAddress())
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.build()) {
try {
for (SolrRequest operation : operations) {
log.info("Executing operation: {}", operation.getParams());
try {
SolrResponse response = operation.process(cloudSolrClient);
SolrResponse response = clusterDataProvider.request(operation);
context.getProperties().compute("responses", (s, o) -> {
List<NamedList<Object>> responses = (List<NamedList<Object>>) o;
if (responses == null) responses = new ArrayList<>(operations.size());
responses.add(response.getResponse());
return responses;
});
} catch (SolrServerException | HttpSolrClient.RemoteSolrException e) {
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unexpected exception executing operation: " + operation.getParams(), e);
}
}
} catch (IOException e) {
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unexpected IOException while processing event: " + event, e);
}

View File

@ -30,6 +30,7 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.common.util.Utils;
@ -72,9 +73,9 @@ public class HttpTriggerListener extends TriggerListenerBase {
private boolean followRedirects;
@Override
public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
super.init(coreContainer, config);
httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
public void init(ClusterDataProvider clusterDataProvider, AutoScalingConfig.TriggerListenerConfig config) {
super.init(clusterDataProvider, config);
httpClient = clusterDataProvider.getHttpClient();
urlTemplate = (String)config.properties.get("url");
payloadTemplate = (String)config.properties.get("payload");
contentType = (String)config.properties.get("contentType");

View File

@ -17,29 +17,24 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.TimeSource;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,60 +44,28 @@ import org.slf4j.LoggerFactory;
public class NodeAddedTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String name;
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
private final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
private final TriggerEventType eventType;
private final TimeSource timeSource;
private boolean isClosed = false;
private Set<String> lastLiveNodes;
private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
public NodeAddedTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
super(container.getZkController().getZkClient());
this.name = name;
this.properties = properties;
this.container = container;
SolrResourceLoader loader,
ClusterDataProvider clusterDataProvider) {
super(name, properties, loader, clusterDataProvider);
this.timeSource = TimeSource.CURRENT_TIME;
this.processorRef = new AtomicReference<>();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
actions = new ArrayList<>(3);
for (Map<String, String> map : o) {
TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class);
actions.add(action);
}
} else {
actions = Collections.emptyList();
}
lastLiveNodes = new HashSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes());
lastLiveNodes = new HashSet<>(clusterDataProvider.getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = Boolean.parseBoolean(String.valueOf(properties.getOrDefault("enabled", "true")));
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties);
}
@Override
public void init() {
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
for (int i = 0; i < o.size(); i++) {
Map<String, String> map = o.get(i);
actions.get(i).init(map);
}
}
super.init();
// pick up added nodes for which marker paths were created
try {
List<String> added = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
List<String> added = clusterDataProvider.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
added.forEach(n -> {
// don't add nodes that have since gone away
if (lastLiveNodes.contains(n)) {
@ -111,77 +74,14 @@ public class NodeAddedTrigger extends TriggerBase {
}
removeMarker(n);
});
} catch (KeeperException.NoNodeException e) {
} catch (NoSuchElementException e) {
// ignore
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
log.warn("Exception retrieving nodeLost markers", e);
}
}
@Override
public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
processorRef.set(processor);
}
@Override
public AutoScaling.TriggerEventProcessor getProcessor() {
return processorRef.get();
}
@Override
public String getName() {
return name;
}
@Override
public TriggerEventType getEventType() {
return eventType;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public int getWaitForSecond() {
return waitForSecond;
}
@Override
public Map<String, Object> getProperties() {
return properties;
}
@Override
public List<TriggerAction> getActions() {
return actions;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof NodeAddedTrigger) {
NodeAddedTrigger that = (NodeAddedTrigger) obj;
return this.name.equals(that.name)
&& this.properties.equals(that.properties);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(name, properties);
}
@Override
public void close() throws IOException {
synchronized (this) {
isClosed = true;
IOUtils.closeWhileHandlingException(actions);
}
}
@Override
public void restoreState(AutoScaling.Trigger old) {
assert old.isClosed();
@ -229,8 +129,7 @@ public class NodeAddedTrigger extends TriggerBase {
}
log.debug("Running NodeAddedTrigger {}", name);
ZkStateReader reader = container.getZkController().getZkStateReader();
Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
Set<String> newLiveNodes = new HashSet<>(clusterDataProvider.getLiveNodes());
log.debug("Found livenodes: {}", newLiveNodes);
// have any nodes that we were tracking been removed from the cluster?
@ -287,24 +186,17 @@ public class NodeAddedTrigger extends TriggerBase {
private void removeMarker(String nodeName) {
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
try {
if (container.getZkController().getZkClient().exists(path, true)) {
container.getZkController().getZkClient().delete(path, -1, true);
if (clusterDataProvider.hasData(path)) {
clusterDataProvider.removeData(path, -1);
}
} catch (KeeperException.NoNodeException e) {
} catch (NoSuchElementException e) {
// ignore
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
log.debug("Exception removing nodeAdded marker " + nodeName, e);
}
}
@Override
public boolean isClosed() {
synchronized (this) {
return isClosed;
}
}
public static class NodeAddedEvent extends TriggerEvent {
public NodeAddedEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames) {

View File

@ -17,29 +17,24 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.TimeSource;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,59 +44,27 @@ import org.slf4j.LoggerFactory;
public class NodeLostTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String name;
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
private final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
private final TriggerEventType eventType;
private final TimeSource timeSource;
private boolean isClosed = false;
private Set<String> lastLiveNodes;
private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
public NodeLostTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
super(container.getZkController().getZkClient());
this.name = name;
this.properties = properties;
this.container = container;
SolrResourceLoader loader,
ClusterDataProvider clusterDataProvider) {
super(name, properties, loader, clusterDataProvider);
this.timeSource = TimeSource.CURRENT_TIME;
this.processorRef = new AtomicReference<>();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
actions = new ArrayList<>(3);
for (Map<String, String> map : o) {
TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class);
actions.add(action);
}
} else {
actions = Collections.emptyList();
}
lastLiveNodes = new HashSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes());
lastLiveNodes = new HashSet<>(clusterDataProvider.getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = Boolean.parseBoolean(String.valueOf(properties.getOrDefault("enabled", "true")));
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
}
@Override
public void init() {
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
for (int i = 0; i < o.size(); i++) {
Map<String, String> map = o.get(i);
actions.get(i).init(map);
}
}
super.init();
// pick up lost nodes for which marker paths were created
try {
List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
List<String> lost = clusterDataProvider.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
lost.forEach(n -> {
// don't add nodes that have since came back
if (!lastLiveNodes.contains(n)) {
@ -110,76 +73,18 @@ public class NodeLostTrigger extends TriggerBase {
}
removeMarker(n);
});
} catch (KeeperException.NoNodeException e) {
} catch (NoSuchElementException e) {
// ignore
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
log.warn("Exception retrieving nodeLost markers", e);
}
}
@Override
public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
processorRef.set(processor);
}
@Override
public AutoScaling.TriggerEventProcessor getProcessor() {
return processorRef.get();
}
@Override
public String getName() {
return name;
}
@Override
public TriggerEventType getEventType() {
return eventType;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public int getWaitForSecond() {
return waitForSecond;
}
@Override
public Map<String, Object> getProperties() {
return properties;
}
@Override
public List<TriggerAction> getActions() {
return actions;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof NodeLostTrigger) {
NodeLostTrigger that = (NodeLostTrigger) obj;
return this.name.equals(that.name)
&& this.properties.equals(that.properties);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(name, properties);
}
@Override
public void close() throws IOException {
synchronized (this) {
isClosed = true;
IOUtils.closeWhileHandlingException(actions);
}
}
@Override
public void restoreState(AutoScaling.Trigger old) {
assert old.isClosed();
@ -226,8 +131,7 @@ public class NodeLostTrigger extends TriggerBase {
}
}
ZkStateReader reader = container.getZkController().getZkStateReader();
Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
Set<String> newLiveNodes = new HashSet<>(clusterDataProvider.getLiveNodes());
log.debug("Running NodeLostTrigger: {} with currently live nodes: {}", name, newLiveNodes);
// have any nodes that we were tracking been added to the cluster?
@ -286,23 +190,16 @@ public class NodeLostTrigger extends TriggerBase {
private void removeMarker(String nodeName) {
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeName;
try {
if (container.getZkController().getZkClient().exists(path, true)) {
container.getZkController().getZkClient().delete(path, -1, true);
if (clusterDataProvider.hasData(path)) {
clusterDataProvider.removeData(path, -1);
}
} catch (KeeperException.NoNodeException e) {
} catch (NoSuchElementException e) {
// ignore
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
log.warn("Exception removing nodeLost marker " + nodeName, e);
}
}
@Override
public boolean isClosed() {
synchronized (this) {
return isClosed;
}
}
public static class NodeLostEvent extends TriggerEvent {
public NodeLostEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames) {

View File

@ -20,16 +20,19 @@ package org.apache.solr.cloud.autoscaling;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
@ -37,6 +40,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@ -52,11 +56,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ZkController zkController;
private final ZkStateReader zkStateReader;
private final SolrZkClient zkClient;
private final ClusterDataProvider clusterDataProvider;
private final ScheduledTriggers scheduledTriggers;
@ -77,12 +77,10 @@ public class OverseerTriggerThread implements Runnable, Closeable {
private AutoScalingConfig autoScalingConfig;
public OverseerTriggerThread(ZkController zkController) {
this.zkController = zkController;
zkStateReader = zkController.getZkStateReader();
zkClient = zkController.getZkClient();
scheduledTriggers = new ScheduledTriggers(zkController);
triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer());
public OverseerTriggerThread(SolrResourceLoader loader, ClusterDataProvider clusterDataProvider) {
this.clusterDataProvider = clusterDataProvider;
scheduledTriggers = new ScheduledTriggers(loader, clusterDataProvider);
triggerFactory = new AutoScaling.TriggerFactoryImpl(loader, clusterDataProvider);
}
@Override
@ -106,11 +104,8 @@ public class OverseerTriggerThread implements Runnable, Closeable {
try {
refreshAutoScalingConf(new AutoScalingWatcher());
} catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
} catch (ConnectException e) {
log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
} catch (KeeperException e) {
log.error("A ZK error has occurred", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@ -201,26 +196,26 @@ public class OverseerTriggerThread implements Runnable, Closeable {
if (cleanOldNodeLostMarkers) {
log.debug("-- clean old nodeLost markers");
try {
List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
List<String> markers = clusterDataProvider.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
markers.forEach(n -> {
removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n);
});
} catch (KeeperException.NoNodeException e) {
} catch (NoSuchElementException e) {
// ignore
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
log.warn("Error removing old nodeLost markers", e);
}
}
if (cleanOldNodeAddedMarkers) {
log.debug("-- clean old nodeAdded markers");
try {
List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
List<String> markers = clusterDataProvider.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
markers.forEach(n -> {
removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n);
});
} catch (KeeperException.NoNodeException e) {
} catch (NoSuchElementException e) {
// ignore
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
log.warn("Error removing old nodeAdded markers", e);
}
@ -231,11 +226,11 @@ public class OverseerTriggerThread implements Runnable, Closeable {
private void removeNodeMarker(String path, String nodeName) {
path = path + "/" + nodeName;
try {
zkClient.delete(path, -1, true);
clusterDataProvider.removeData(path, -1);
log.debug(" -- deleted " + path);
} catch (KeeperException.NoNodeException e) {
} catch (NoSuchElementException e) {
// ignore
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
log.warn("Error removing old marker " + path, e);
}
}
@ -250,11 +245,8 @@ public class OverseerTriggerThread implements Runnable, Closeable {
try {
refreshAutoScalingConf(this);
} catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
} catch (KeeperException e) {
log.error("A ZK error has occurred", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
} catch (ConnectException e) {
log.warn("ZooKeeper watch triggered for autoscaling conf, but we cannot talk to ZK: [{}]", e.getMessage());
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@ -266,13 +258,13 @@ public class OverseerTriggerThread implements Runnable, Closeable {
}
private void refreshAutoScalingConf(Watcher watcher) throws KeeperException, InterruptedException {
private void refreshAutoScalingConf(Watcher watcher) throws ConnectException, InterruptedException, IOException {
updateLock.lock();
try {
if (isClosed) {
return;
}
AutoScalingConfig currentConfig = zkStateReader.getAutoScalingConfig(watcher);
AutoScalingConfig currentConfig = clusterDataProvider.getAutoScalingConfig(watcher);
log.debug("Refreshing {} with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, currentConfig.getZkVersion());
if (znodeVersion >= currentConfig.getZkVersion()) {
// protect against reordered watcher fires by ensuring that we only move forward

View File

@ -41,15 +41,18 @@ import java.util.stream.Collectors;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
@ -85,17 +88,17 @@ public class ScheduledTriggers implements Closeable {
private final ActionThrottle actionThrottle;
private final SolrZkClient zkClient;
private final ClusterDataProvider clusterDataProvider;
private final SolrResourceLoader loader;
private final Overseer.Stats queueStats;
private final CoreContainer coreContainer;
private final TriggerListeners listeners;
private AutoScalingConfig autoScalingConfig;
public ScheduledTriggers(ZkController zkController) {
public ScheduledTriggers(SolrResourceLoader loader, ClusterDataProvider clusterDataProvider) {
// todo make the core pool size configurable
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
// ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand
@ -108,8 +111,8 @@ public class ScheduledTriggers implements Closeable {
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
// todo make the wait time configurable
actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
coreContainer = zkController.getCoreContainer();
zkClient = zkController.getZkClient();
this.clusterDataProvider = clusterDataProvider;
this.loader = loader;
queueStats = new Overseer.Stats();
listeners = new TriggerListeners();
}
@ -136,7 +139,12 @@ public class ScheduledTriggers implements Closeable {
if (isClosed) {
throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
}
ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger, zkClient, queueStats);
ScheduledTrigger scheduledTrigger;
try {
scheduledTrigger = new ScheduledTrigger(newTrigger, clusterDataProvider, queueStats);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "exception creating scheduled trigger", e);
}
ScheduledTrigger old = scheduledTriggers.putIfAbsent(newTrigger.getName(), scheduledTrigger);
if (old != null) {
if (old.trigger.equals(newTrigger)) {
@ -182,7 +190,7 @@ public class ScheduledTriggers implements Closeable {
// let the action executor thread wait instead of the trigger thread so we use the throttle here
actionThrottle.minimumWaitBetweenActions();
actionThrottle.markAttemptingAction();
ActionContext actionContext = new ActionContext(coreContainer, newTrigger, new HashMap<>());
ActionContext actionContext = new ActionContext(clusterDataProvider, newTrigger, new HashMap<>());
for (TriggerAction action : actions) {
List<String> beforeActions = (List<String>)actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.BEFORE_ACTION.toString(), k -> new ArrayList<String>());
beforeActions.add(action.getName());
@ -244,23 +252,23 @@ public class ScheduledTriggers implements Closeable {
String statePath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName;
String eventsPath = ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName;
try {
if (zkClient.exists(statePath, true)) {
zkClient.delete(statePath, -1, true);
if (clusterDataProvider.hasData(statePath)) {
clusterDataProvider.removeData(statePath, -1);
}
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
log.warn("Failed to remove state for removed trigger " + statePath, e);
}
try {
if (zkClient.exists(eventsPath, true)) {
List<String> events = zkClient.getChildren(eventsPath, null, true);
if (clusterDataProvider.hasData(eventsPath)) {
List<String> events = clusterDataProvider.listData(eventsPath);
List<Op> ops = new ArrayList<>(events.size() + 1);
events.forEach(ev -> {
ops.add(Op.delete(eventsPath + "/" + ev, -1));
});
ops.add(Op.delete(eventsPath, -1));
zkClient.multi(ops, true);
clusterDataProvider.multi(ops);
}
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
log.warn("Failed to remove events for removed trigger " + eventsPath, e);
}
}
@ -297,9 +305,9 @@ public class ScheduledTriggers implements Closeable {
boolean replay;
volatile boolean isClosed;
ScheduledTrigger(AutoScaling.Trigger trigger, SolrZkClient zkClient, Overseer.Stats stats) {
ScheduledTrigger(AutoScaling.Trigger trigger, ClusterDataProvider clusterDataProvider, Overseer.Stats stats) throws IOException {
this.trigger = trigger;
this.queue = new TriggerEventQueue(zkClient, trigger.getName(), stats);
this.queue = new TriggerEventQueue(clusterDataProvider, trigger.getName(), stats);
this.replay = true;
this.isClosed = false;
}
@ -426,13 +434,13 @@ public class ScheduledTriggers implements Closeable {
if (listener == null) { // create new instance
String clazz = config.listenerClass;
try {
listener = coreContainer.getResourceLoader().newInstance(clazz, TriggerListener.class);
listener = loader.newInstance(clazz, TriggerListener.class);
} catch (Exception e) {
log.warn("Invalid TriggerListener class name '" + clazz + "', skipping...", e);
}
if (listener != null) {
try {
listener.init(coreContainer, config);
listener.init(clusterDataProvider, config);
listenersPerName.put(config.name, listener);
} catch (Exception e) {
log.warn("Error initializing TriggerListener " + config, e);

View File

@ -31,6 +31,7 @@ import java.util.StringJoiner;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
@ -73,8 +74,8 @@ public class SystemLogListener extends TriggerListenerBase {
private boolean enabled = true;
@Override
public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
super.init(coreContainer, config);
public void init(ClusterDataProvider clusterDataProvider, AutoScalingConfig.TriggerListenerConfig config) {
super.init(clusterDataProvider, config);
collection = (String)config.properties.getOrDefault(CollectionAdminParams.COLLECTION, CollectionAdminParams.SYSTEM_COLL);
enabled = Boolean.parseBoolean(String.valueOf(config.properties.getOrDefault("enabled", true)));
}
@ -85,10 +86,7 @@ public class SystemLogListener extends TriggerListenerBase {
if (!enabled) {
return;
}
try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder()
.withZkHost(coreContainer.getZkController().getZkServerAddress())
.withHttpClient(coreContainer.getUpdateShardHandler().getHttpClient())
.build()) {
try {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(CommonParams.TYPE, DOC_TYPE);
doc.addField(SOURCE_FIELD, SOURCE);
@ -122,7 +120,8 @@ public class SystemLogListener extends TriggerListenerBase {
}
UpdateRequest req = new UpdateRequest();
req.add(doc);
cloudSolrClient.request(req, collection);
req.setParam(CollectionAdminParams.COLLECTION, collection);
clusterDataProvider.request(req);
} catch (Exception e) {
if ((e instanceof SolrException) && e.getMessage().contains("Collection not found")) {
// relatively benign

View File

@ -16,14 +16,26 @@
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@ -37,19 +49,130 @@ import org.slf4j.LoggerFactory;
public abstract class TriggerBase implements AutoScaling.Trigger {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected SolrZkClient zkClient;
protected final String name;
protected final ClusterDataProvider clusterDataProvider;
protected final Map<String, Object> properties = new HashMap<>();
protected final TriggerEventType eventType;
protected final int waitForSecond;
protected Map<String,Object> lastState;
protected final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef = new AtomicReference<>();
protected final List<TriggerAction> actions;
protected final boolean enabled;
protected boolean isClosed;
protected TriggerBase(SolrZkClient zkClient) {
this.zkClient = zkClient;
protected TriggerBase(String name, Map<String, Object> properties, SolrResourceLoader loader, ClusterDataProvider clusterDataProvider) {
this.name = name;
if (properties != null) {
this.properties.putAll(properties);
}
this.clusterDataProvider = clusterDataProvider;
this.enabled = Boolean.parseBoolean(String.valueOf(this.properties.getOrDefault("enabled", "true")));
this.eventType = TriggerEventType.valueOf(this.properties.getOrDefault("event", TriggerEventType.INVALID.toString()).toString().toUpperCase(Locale.ROOT));
this.waitForSecond = ((Long) this.properties.getOrDefault("waitFor", -1L)).intValue();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
actions = new ArrayList<>(3);
for (Map<String, String> map : o) {
TriggerAction action = loader.newInstance(map.get("class"), TriggerAction.class);
actions.add(action);
}
} else {
actions = Collections.emptyList();
}
try {
zkClient.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, false, true);
} catch (KeeperException | InterruptedException e) {
if (!clusterDataProvider.hasData(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH)) {
clusterDataProvider.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
}
} catch (IOException e) {
LOG.warn("Exception checking ZK path " + ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, e);
}
}
@Override
public void init() {
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
for (int i = 0; i < o.size(); i++) {
Map<String, String> map = o.get(i);
actions.get(i).init(map);
}
}
}
@Override
public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
processorRef.set(processor);
}
@Override
public AutoScaling.TriggerEventProcessor getProcessor() {
return processorRef.get();
}
@Override
public String getName() {
return name;
}
@Override
public TriggerEventType getEventType() {
return eventType;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public int getWaitForSecond() {
return waitForSecond;
}
@Override
public Map<String, Object> getProperties() {
return properties;
}
@Override
public List<TriggerAction> getActions() {
return actions;
}
@Override
public boolean isClosed() {
synchronized (this) {
return isClosed;
}
}
@Override
public void close() throws IOException {
synchronized (this) {
isClosed = true;
IOUtils.closeWhileHandlingException(actions);
}
}
@Override
public int hashCode() {
return Objects.hash(name, properties);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass().equals(this.getClass())) {
TriggerBase that = (TriggerBase) obj;
return this.name.equals(that.name)
&& this.properties.equals(that.properties);
}
return false;
}
/**
* Prepare and return internal state of this trigger in a format suitable for persisting in ZK.
* @return map of internal state properties. Note: values must be supported by {@link Utils#toJSON(Object)}.
@ -72,15 +195,15 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
byte[] data = Utils.toJSON(state);
String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName();
try {
if (zkClient.exists(path, true)) {
if (clusterDataProvider.hasData(path)) {
// update
zkClient.setData(path, data, -1, true);
clusterDataProvider.setData(path, data, -1);
} else {
// create
zkClient.create(path, data, CreateMode.PERSISTENT, true);
clusterDataProvider.createData(path, data, CreateMode.PERSISTENT);
}
lastState = state;
} catch (KeeperException | InterruptedException e) {
} catch (IOException e) {
LOG.warn("Exception updating trigger state '" + path + "'", e);
}
}
@ -90,10 +213,10 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
byte[] data = null;
String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName();
try {
if (zkClient.exists(path, true)) {
data = zkClient.getData(path, null, new Stat(), true);
if (clusterDataProvider.hasData(path)) {
ClusterDataProvider.VersionedData versionedDat = clusterDataProvider.getData(path);
}
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
LOG.warn("Exception getting trigger state '" + path + "'", e);
}
if (data != null) {

View File

@ -1,23 +1,24 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Queue;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TimeSource;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class TriggerEventQueue extends DistributedQueue {
public class TriggerEventQueue {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String ENQUEUE_TIME = "_enqueue_time_";
@ -25,9 +26,11 @@ public class TriggerEventQueue extends DistributedQueue {
private final String triggerName;
private final TimeSource timeSource;
private final DistributedQueue delegate;
public TriggerEventQueue(SolrZkClient zookeeper, String triggerName, Overseer.Stats stats) {
super(zookeeper, ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName, stats);
public TriggerEventQueue(ClusterDataProvider clusterDataProvider, String triggerName, Overseer.Stats stats) throws IOException {
// TODO: collect stats
this.delegate = clusterDataProvider.getDistributedQueueFactory().makeQueue(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName);
this.triggerName = triggerName;
this.timeSource = TimeSource.CURRENT_TIME;
}
@ -36,9 +39,9 @@ public class TriggerEventQueue extends DistributedQueue {
event.getProperties().put(ENQUEUE_TIME, timeSource.getTime());
try {
byte[] data = Utils.toJSON(event);
offer(data);
delegate.offer(data);
return true;
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
LOG.warn("Exception adding event " + event + " to queue " + triggerName, e);
return false;
}
@ -47,7 +50,7 @@ public class TriggerEventQueue extends DistributedQueue {
public TriggerEvent peekEvent() {
byte[] data;
try {
while ((data = peek()) != null) {
while ((data = delegate.peek()) != null) {
if (data.length == 0) {
LOG.warn("ignoring empty data...");
continue;
@ -60,7 +63,7 @@ public class TriggerEventQueue extends DistributedQueue {
continue;
}
}
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
LOG.warn("Exception peeking queue of trigger " + triggerName, e);
}
return null;
@ -69,7 +72,7 @@ public class TriggerEventQueue extends DistributedQueue {
public TriggerEvent pollEvent() {
byte[] data;
try {
while ((data = poll()) != null) {
while ((data = delegate.poll()) != null) {
if (data.length == 0) {
LOG.warn("ignoring empty data...");
continue;
@ -82,7 +85,7 @@ public class TriggerEventQueue extends DistributedQueue {
continue;
}
}
} catch (KeeperException | InterruptedException e) {
} catch (Exception e) {
LOG.warn("Exception polling queue of trigger " + triggerName, e);
}
return null;

View File

@ -19,6 +19,7 @@ package org.apache.solr.cloud.autoscaling;
import java.io.Closeable;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.core.CoreContainer;
@ -28,7 +29,7 @@ import org.apache.solr.core.CoreContainer;
*/
public interface TriggerListener extends Closeable {
void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) throws Exception;
void init(ClusterDataProvider clusterDataProvider, AutoScalingConfig.TriggerListenerConfig config) throws Exception;
AutoScalingConfig.TriggerListenerConfig getConfig();

View File

@ -19,7 +19,7 @@ package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
/**
* Base class for implementations of {@link TriggerListener}.
@ -27,11 +27,11 @@ import org.apache.solr.core.CoreContainer;
public abstract class TriggerListenerBase implements TriggerListener {
protected AutoScalingConfig.TriggerListenerConfig config;
protected CoreContainer coreContainer;
protected ClusterDataProvider clusterDataProvider;
@Override
public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
this.coreContainer = coreContainer;
public void init(ClusterDataProvider clusterDataProvider, AutoScalingConfig.TriggerListenerConfig config) {
this.clusterDataProvider = clusterDataProvider;
this.config = config;
}

View File

@ -0,0 +1,28 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.cloud.ZkDistributedQueue;
import org.apache.solr.common.cloud.SolrZkClient;
/**
*
*/
public class ZkDistributedQueueFactory implements ClusterDataProvider.DistributedQueueFactory {
private final SolrZkClient zkClient;
public ZkDistributedQueueFactory(SolrZkClient zkClient) {
this.zkClient = zkClient;
}
@Override
public DistributedQueue makeQueue(String path) throws IOException {
return new ZkDistributedQueue(zkClient, path);
}
@Override
public void removeQueue(String path) throws IOException {
}
}

View File

@ -956,6 +956,8 @@ public class CoreContainer {
SolrException.log(log, null, e);
} catch (KeeperException e) {
SolrException.log(log, null, e);
} catch (Exception e) {
SolrException.log(log, null, e);
}
}
@ -1377,6 +1379,8 @@ public class CoreContainer {
throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted while unregistering core [" + name + "] from cloud state");
} catch (KeeperException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error unregistering core [" + name + "] from cloud state", e);
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error unregistering core [" + name + "] from cloud state", e);
}
}
}

View File

@ -227,6 +227,8 @@ public class ZkContainer {
} catch (InterruptedException e) {
Thread.interrupted();
ZkContainer.log.error("", e);
} catch (Exception e) {
ZkContainer.log.error("", e);
}
}
}

View File

@ -46,7 +46,7 @@ import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.SimpleSolrResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.overseer.OverseerAction;

View File

@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreStatus;
@ -82,8 +83,7 @@ public class DeleteShardTest extends SolrCloudTestCase {
}
protected void setSliceState(String collection, String slice, State state) throws SolrServerException, IOException,
KeeperException, InterruptedException {
protected void setSliceState(String collection, String slice, State state) throws Exception {
CloudSolrClient client = cluster.getSolrClient();

View File

@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
@ -95,7 +96,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
String dqZNode = "/distqueue/test";
byte[] data = "hello world".getBytes(UTF8);
DistributedQueue consumer = makeDistributedQueue(dqZNode);
ZkDistributedQueue consumer = makeDistributedQueue(dqZNode);
DistributedQueue producer = makeDistributedQueue(dqZNode);
DistributedQueue producer2 = makeDistributedQueue(dqZNode);
@ -124,7 +125,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
String dqZNode = "/distqueue/test";
String testData = "hello world";
DistributedQueue dq = makeDistributedQueue(dqZNode);
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
assertNull(dq.peek());
Future<String> future = executor.submit(() -> new String(dq.peek(true), UTF8));
@ -171,7 +172,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
@Test
public void testLeakChildWatcher() throws Exception {
String dqZNode = "/distqueue/test";
DistributedQueue dq = makeDistributedQueue(dqZNode);
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
@ -280,8 +281,8 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
assertFalse(sessionId == zkClient.getSolrZooKeeper().getSessionId());
}
protected DistributedQueue makeDistributedQueue(String dqZNode) throws Exception {
return new DistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
protected ZkDistributedQueue makeDistributedQueue(String dqZNode) throws Exception {
return new ZkDistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
}
private static class QueueChangerThread extends Thread {

View File

@ -26,6 +26,7 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrRequest.METHOD;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@ -230,8 +231,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
}
protected void setReplicaState(String collection, String slice, Replica replica, Replica.State state) throws SolrServerException, IOException,
KeeperException, InterruptedException {
protected void setReplicaState(String collection, String slice, Replica replica, Replica.State state) throws Exception {
DistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
ZkStateReader zkStateReader = cloudClient.getZkStateReader();

View File

@ -23,6 +23,7 @@ import java.util.Random;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
@ -68,7 +69,7 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
private void testFillWorkQueue() throws Exception {
try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(),
DistributedQueue distributedQueue = new ZkDistributedQueue(cloudClient.getZkStateReader().getZkClient(),
"/overseer/collection-queue-work", new Overseer.Stats());
//fill the work queue with blocked tasks by adding more than the no:of parallel tasks
for (int i = 0; i < MAX_PARALLEL_TASKS+5; i++) {
@ -149,7 +150,7 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
private void testTaskExclusivity() throws Exception, SolrServerException {
DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(),
DistributedQueue distributedQueue = new ZkDistributedQueue(cloudClient.getZkStateReader().getZkClient(),
"/overseer/collection-queue-work", new Overseer.Stats());
try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {

View File

@ -37,6 +37,7 @@ import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@ -133,7 +134,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
zkClient.close();
}
public void createCollection(String collection, int numShards) throws KeeperException, InterruptedException {
public void createCollection(String collection, int numShards) throws Exception {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
"name", collection,
@ -146,7 +147,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
public String publishState(String collection, String coreName, String coreNodeName, String shard, Replica.State stateName, int numShards)
throws KeeperException, InterruptedException, IOException {
throws Exception {
if (stateName == null) {
ElectionContext ec = electionContext.remove(coreName);
if (ec != null) {

View File

@ -31,6 +31,7 @@ import org.apache.lucene.util.TestUtil;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;

View File

@ -150,7 +150,7 @@ public class AutoAddReplicasPlanActionTest extends SolrCloudTestCase{
private List<SolrRequest> getOperations(JettySolrRunner actionJetty, String lostNodeName) {
AutoAddReplicasPlanAction action = new AutoAddReplicasPlanAction();
TriggerEvent lostNode = new NodeLostTrigger.NodeLostEvent(TriggerEventType.NODELOST, ".auto_add_replicas", Collections.singletonList(System.currentTimeMillis()), Collections.singletonList(lostNodeName));
ActionContext context = new ActionContext(actionJetty.getCoreContainer(), null, new HashMap<>());
ActionContext context = new ActionContext(actionJetty.getCoreContainer().getZkController().getClusterDataProvider(), null, new HashMap<>());
action.process(lostNode, context);
List<SolrRequest> operations = (List) context.getProperty("operations");
return operations;

View File

@ -110,7 +110,7 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
NodeLostTrigger.NodeLostEvent nodeLostEvent = new NodeLostTrigger.NodeLostEvent(TriggerEventType.NODELOST,
"mock_trigger_name", Collections.singletonList(TimeSource.CURRENT_TIME.getTime()),
Collections.singletonList(sourceNodeName));
ActionContext actionContext = new ActionContext(survivor.getCoreContainer(), null,
ActionContext actionContext = new ActionContext(survivor.getCoreContainer().getZkController().getClusterDataProvider(), null,
new HashMap<>(Collections.singletonMap("operations", operations)));
action.process(nodeLostEvent, actionContext);

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.TimeSource;
import org.junit.Before;
import org.junit.BeforeClass;
@ -72,7 +73,8 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
long waitForSeconds = 1 + random().nextInt(5);
Map<String, Object> props = createTriggerProps(waitForSeconds);
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
@ -112,7 +114,8 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
// add a new node but remove it before the waitFor period expires
// and assert that the trigger doesn't fire at all
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
final long waitTime = 2;
props.put("waitFor", waitTime);
trigger.setProcessor(noFirstRunProcessor);
@ -157,7 +160,8 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
action.put("name", "testActionInit");
action.put("class", NodeAddedTriggerTest.AssertInitTriggerAction.class.getName());
actions.add(action);
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
assertEquals(true, actionConstructorCalled.get());
assertEquals(false, actionInitCalled.get());
assertEquals(false, actionCloseCalled.get());
@ -198,7 +202,8 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
public void testListenerAcceptance() throws Exception {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
Map<String, Object> props = createTriggerProps(0);
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
trigger.setProcessor(noFirstRunProcessor);
trigger.run(); // starts tracking live nodes
@ -234,7 +239,8 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
// add a new node but update the trigger before the waitFor period expires
// and assert that the new trigger still fires
NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container);
NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider());
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
@ -242,7 +248,8 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
trigger.run(); // this run should detect the new node
trigger.close(); // close the old trigger
try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("some_different_name", props, container)) {
try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("some_different_name", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
try {
newTrigger.restoreState(trigger);
fail("Trigger should only be able to restore state from an old trigger of the same name");
@ -251,7 +258,8 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
}
}
try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
newTrigger.setProcessor(event -> {

View File

@ -73,7 +73,8 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
long waitForSeconds = 1 + random().nextInt(5);
Map<String, Object> props = createTriggerProps(waitForSeconds);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
String lostNodeName1 = cluster.getJettySolrRunner(1).getNodeName();
@ -117,7 +118,8 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
// remove a node but add it back before the waitFor period expires
// and assert that the trigger doesn't fire at all
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
final long waitTime = 2;
props.put("waitFor", waitTime);
trigger.setProcessor(noFirstRunProcessor);
@ -173,7 +175,8 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
action.put("name", "testActionInit");
action.put("class", AssertInitTriggerAction.class.getName());
actions.add(action);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container)) {
try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
assertEquals(true, actionConstructorCalled.get());
assertEquals(false, actionInitCalled.get());
assertEquals(false, actionCloseCalled.get());
@ -214,7 +217,8 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
public void testListenerAcceptance() throws Exception {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
Map<String, Object> props = createTriggerProps(0);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container)) {
try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
trigger.setProcessor(noFirstRunProcessor);
JettySolrRunner newNode = cluster.startJettySolrRunner();
@ -267,7 +271,8 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
// remove a node but update the trigger before the waitFor period expires
// and assert that the new trigger still fires
NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container);
NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider());
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
@ -284,7 +289,8 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
trigger.run(); // this run should detect the lost node
trigger.close(); // close the old trigger
try (NodeLostTrigger newTrigger = new NodeLostTrigger("some_different_name", props, container)) {
try (NodeLostTrigger newTrigger = new NodeLostTrigger("some_different_name", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
try {
newTrigger.restoreState(trigger);
fail("Trigger should only be able to restore state from an old trigger of the same name");
@ -293,7 +299,8 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
}
}
try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, container.getResourceLoader(),
container.getZkController().getClusterDataProvider())) {
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
newTrigger.setProcessor(event -> {

View File

@ -168,7 +168,7 @@ public class TestPolicyCloud extends SolrCloudTestCase {
CollectionAdminRequest.createCollection("metricsTest", "conf", 1, 1)
.process(cluster.getSolrClient());
DocCollection collection = getCollectionState("metricsTest");
SolrClientDataProvider provider = new SolrClientDataProvider(solrClient);
SolrClientDataProvider provider = new SolrClientDataProvider(new ZkDistributedQueueFactory(cluster.getZkClient()), solrClient);
List<String> tags = Arrays.asList("metrics:solr.node:ADMIN./admin/authorization.clientErrors:count",
"metrics:solr.jvm:buffers.direct.Count");
Map<String, Object> val = provider.getNodeValues(collection .getReplicas().get(0).getNodeName(), tags);
@ -268,7 +268,7 @@ public class TestPolicyCloud extends SolrCloudTestCase {
CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "shard1", 2)
.process(cluster.getSolrClient());
DocCollection rulesCollection = getCollectionState("policiesTest");
SolrClientDataProvider provider = new SolrClientDataProvider(cluster.getSolrClient());
SolrClientDataProvider provider = new SolrClientDataProvider(new ZkDistributedQueueFactory(cluster.getZkClient()), cluster.getSolrClient());
Map<String, Object> val = provider.getNodeValues(rulesCollection.getReplicas().get(0).getNodeName(), Arrays.asList(
"freedisk",
"cores",

View File

@ -34,6 +34,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@ -965,8 +966,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
public static class TestTriggerListener extends TriggerListenerBase {
@Override
public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
super.init(coreContainer, config);
public void init(ClusterDataProvider clusterDataProvider, AutoScalingConfig.TriggerListenerConfig config) {
super.init(clusterDataProvider, config);
listenerCreated.countDown();
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud;
import java.util.Collection;
import java.util.function.Predicate;
import org.apache.solr.common.util.Pair;
/**
*
*/
public interface DistributedQueue {
byte[] peek() throws Exception;
byte[] peek(boolean block) throws Exception;
byte[] peek(long wait) throws Exception;
byte[] poll() throws Exception;
byte[] remove() throws Exception;
byte[] take() throws Exception;
void offer(byte[] data) throws Exception;
Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws Exception;
}

View File

@ -19,12 +19,28 @@ package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Watcher;
/**
* This interface abstracts the details of dealing with Zookeeper and Solr from the autoscaling framework.
*/
public interface ClusterDataProvider extends Closeable {
/**Get the value of each tag for a given node
/**
* Get the value of each tag for a given node
*
* @param node node name
* @param tags tag names
@ -34,19 +50,87 @@ public interface ClusterDataProvider extends Closeable {
/**
* Get the details of each replica in a node. It attempts to fetch as much details about
* the replica as mentioned in the keys list. It is not necessary to give al details
* the replica as mentioned in the keys list. It is not necessary to give all details
* <p>
* the format is {collection:shard :[{replicadetails}]}
*/
Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys);
Collection<String> getNodes();
/**
* Get the current set of live nodes.
*/
Collection<String> getLiveNodes();
/**Get the collection-specific policy
ClusterState getClusterState() throws IOException;
Map<String, Object> getClusterProperties();
default <T> T getClusterProperty(String key, T defaultValue) {
T value = (T) getClusterProperties().get(key);
if (value == null)
return defaultValue;
return value;
}
AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws ConnectException, InterruptedException, IOException;
default AutoScalingConfig getAutoScalingConfig() throws ConnectException, InterruptedException, IOException {
return getAutoScalingConfig(null);
}
/**
* Get the collection-specific policy
*/
String getPolicyNameByCollection(String coll);
@Override
default void close() throws IOException {
}
// ZK-like methods
boolean hasData(String path) throws IOException;
List<String> listData(String path) throws NoSuchElementException, IOException;
class VersionedData {
public final int version;
public final byte[] data;
public VersionedData(int version, byte[] data) {
this.version = version;
this.data = data;
}
}
VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException;
default VersionedData getData(String path) throws NoSuchElementException, IOException {
return getData(path, null);
}
// mutators
void makePath(String path) throws IOException;
void createData(String path, byte[] data, CreateMode mode) throws IOException;
void removeData(String path, int version) throws NoSuchElementException, IOException;
void setData(String path, byte[] data, int version) throws NoSuchElementException, IOException;
List<OpResult> multi(final Iterable<Op> ops) throws IOException;
// Solr-like methods
SolrResponse request(SolrRequest req) throws IOException;
HttpClient getHttpClient();
interface DistributedQueueFactory {
DistributedQueue makeQueue(String path) throws IOException;
void removeQueue(String path) throws IOException;
}
DistributedQueueFactory getDistributedQueueFactory();
}

View File

@ -0,0 +1,118 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.net.ConnectException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Watcher;
/**
*
*/
public class DelegatingClusterDataProvider implements ClusterDataProvider {
protected ClusterDataProvider delegate;
public DelegatingClusterDataProvider(ClusterDataProvider delegate) {
this.delegate = delegate;
}
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return delegate.getNodeValues(node, tags);
}
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return delegate.getReplicaInfo(node, keys);
}
@Override
public Collection<String> getLiveNodes() {
return delegate.getLiveNodes();
}
@Override
public Map<String, Object> getClusterProperties() {
return delegate.getClusterProperties();
}
@Override
public ClusterState getClusterState() throws IOException {
return delegate.getClusterState();
}
@Override
public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws ConnectException, InterruptedException, IOException {
return delegate.getAutoScalingConfig(watcher);
}
@Override
public String getPolicyNameByCollection(String coll) {
return delegate.getPolicyNameByCollection(coll);
}
@Override
public boolean hasData(String path) throws IOException {
return delegate.hasData(path);
}
@Override
public List<String> listData(String path) throws NoSuchElementException, IOException {
return delegate.listData(path);
}
@Override
public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException {
return delegate.getData(path, watcher);
}
@Override
public void makePath(String path) throws IOException {
delegate.makePath(path);
}
@Override
public void createData(String path, byte[] data, CreateMode mode) throws IOException {
delegate.createData(path, data, mode);
}
@Override
public void removeData(String path, int version) throws NoSuchElementException, IOException {
delegate.removeData(path, version);
}
@Override
public void setData(String path, byte[] data, int version) throws NoSuchElementException, IOException {
delegate.setData(path, data, version);
}
@Override
public List<OpResult> multi(Iterable<Op> ops) throws IOException {
return delegate.multi(ops);
}
@Override
public SolrResponse request(SolrRequest req) throws IOException {
return delegate.request(req);
}
@Override
public HttpClient getHttpClient() {
return delegate.getHttpClient();
}
@Override
public DistributedQueueFactory getDistributedQueueFactory() {
return delegate.getDistributedQueueFactory();
}
}

View File

@ -201,7 +201,7 @@ public class Policy implements MapWriter {
}
Session(ClusterDataProvider dataProvider) {
this.nodes = new ArrayList<>(dataProvider.getNodes());
this.nodes = new ArrayList<>(dataProvider.getLiveNodes());
this.dataProvider = dataProvider;
for (String node : nodes) {
collections.addAll(dataProvider.getReplicaInfo(node, Collections.emptyList()).keySet());

View File

@ -49,22 +49,7 @@ public class PolicyHelper {
List<String> nodesList) {
List<ReplicaPosition> positions = new ArrayList<>();
final ClusterDataProvider delegate = cdp;
cdp = new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return delegate.getNodeValues(node, tags);
}
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return delegate.getReplicaInfo(node, keys);
}
@Override
public Collection<String> getNodes() {
return delegate.getNodes();
}
cdp = new DelegatingClusterDataProvider(delegate) {
@Override
public String getPolicyNameByCollection(String coll) {
return policyMapping.get() != null && policyMapping.get().containsKey(coll) ?

View File

@ -46,7 +46,7 @@ public class Row implements MapWriter {
if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>();
this.node = node;
cells = new Cell[params.size()];
isLive = dataProvider.getNodes().contains(node);
isLive = dataProvider.getLiveNodes().contains(node);
Map<String, Object> vals = isLive ? dataProvider.getNodeValues(node, params) : Collections.emptyMap();
for (int i = 0; i < params.size(); i++) {
String s = params.get(i);

View File

@ -28,10 +28,15 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
@ -40,6 +45,7 @@ import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.cloud.rule.RemoteCallback;
@ -50,7 +56,12 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,16 +74,19 @@ public class SolrClientDataProvider implements ClusterDataProvider, MapWriter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final CloudSolrClient solrClient;
private final DistributedQueueFactory queueFactory;
private final ZkStateReader zkStateReader;
private final SolrZkClient zkClient;
private final Map<String, Map<String, Map<String, List<ReplicaInfo>>>> data = new HashMap<>();
private Set<String> liveNodes;
private Map<String, Object> snitchSession = new HashMap<>();
private Map<String, Map> nodeVsTags = new HashMap<>();
public SolrClientDataProvider(CloudSolrClient solrClient) {
public SolrClientDataProvider(DistributedQueueFactory queueFactory, CloudSolrClient solrClient) {
this.queueFactory = queueFactory;
this.solrClient = solrClient;
ZkStateReader zkStateReader = solrClient.getZkStateReader();
this.zkStateReader = solrClient.getZkStateReader();
this.zkClient = zkStateReader.getZkClient();
ClusterState clusterState = zkStateReader.getClusterState();
this.liveNodes = clusterState.getLiveNodes();
Map<String, ClusterState.CollectionRef> all = clusterState.getCollectionStates();
all.forEach((collName, ref) -> {
DocCollection coll = ref.get();
@ -107,16 +121,135 @@ public class SolrClientDataProvider implements ClusterDataProvider, MapWriter {
}
@Override
public Collection<String> getNodes() {
return liveNodes;
public Collection<String> getLiveNodes() {
return solrClient.getZkStateReader().getClusterState().getLiveNodes();
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put("liveNodes", liveNodes);
ew.put("liveNodes", zkStateReader.getClusterState().getLiveNodes());
ew.put("replicaInfo", Utils.getDeepCopy(data, 5));
ew.put("nodeValues", nodeVsTags);
}
@Override
public Map<String, Object> getClusterProperties() {
return zkStateReader.getClusterProperties();
}
@Override
public ClusterState getClusterState() {
return zkStateReader.getClusterState();
}
@Override
public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws IOException {
try {
return zkStateReader.getAutoScalingConfig(watcher);
} catch (KeeperException | InterruptedException e) {
throw new IOException(e);
}
}
@Override
public boolean hasData(String path) throws IOException {
try {
return zkClient.exists(path, true);
} catch (KeeperException | InterruptedException e) {
throw new IOException(e);
}
}
@Override
public List<String> listData(String path) throws NoSuchElementException, IOException {
try {
return zkClient.getChildren(path, null, true);
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException(path);
} catch (KeeperException | InterruptedException e) {
throw new IOException(e);
}
}
@Override
public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException {
Stat stat = new Stat();
try {
byte[] bytes = zkClient.getData(path, watcher, stat, true);
return new VersionedData(stat.getVersion(), bytes);
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException(path);
} catch (KeeperException | InterruptedException e) {
throw new IOException(e);
}
}
@Override
public void makePath(String path) throws IOException {
try {
zkClient.makePath(path, true);
} catch (KeeperException | InterruptedException e) {
throw new IOException(e);
}
}
@Override
public void createData(String path, byte[] data, CreateMode mode) throws IOException {
try {
zkClient.create(path, data, mode, true);
} catch (KeeperException | InterruptedException e) {
throw new IOException(e);
}
}
@Override
public void removeData(String path, int version) throws NoSuchElementException, IOException {
try {
zkClient.delete(path, version, true);
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException(path);
} catch (KeeperException | InterruptedException e) {
throw new IOException(e);
}
}
@Override
public void setData(String path, byte[] data, int version) throws NoSuchElementException, IOException {
try {
zkClient.setData(path, data, version, true);
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException(path);
} catch (KeeperException | InterruptedException e) {
throw new IOException(e);
}
}
@Override
public SolrResponse request(SolrRequest req) throws IOException {
try {
return req.process(solrClient);
} catch (SolrServerException e) {
throw new IOException(e);
}
}
@Override
public List<OpResult> multi(Iterable<Op> ops) throws IOException {
try {
return zkClient.multi(ops, true);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public HttpClient getHttpClient() {
return solrClient.getHttpClient();
}
@Override
public DistributedQueueFactory getDistributedQueueFactory() {
return queueFactory;
}
static class ClientSnitchCtx

View File

@ -27,7 +27,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import com.google.common.collect.ImmutableList;
import org.apache.solr.SolrTestCaseJ4;
@ -422,7 +421,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
});
});
return new ClusterDataProvider(){
return new DelegatingClusterDataProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return (Map<String, Object>) Utils.getObjectByPath(m,false, Arrays.asList("nodeValues", node));
@ -434,7 +433,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
@Override
public Collection<String> getNodes() {
public Collection<String> getLiveNodes() {
return (Collection<String>) m.get("liveNodes");
}
@ -963,7 +962,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoscaleJson));
ClusterDataProvider clusterDataProvider = getClusterDataProvider(nodeValues, clusterState);
ClusterDataProvider cdp = new ClusterDataProvider() {
ClusterDataProvider cdp = new DelegatingClusterDataProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return clusterDataProvider.getNodeValues(node, tags);
@ -975,8 +974,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
@Override
public Collection<String> getNodes() {
return clusterDataProvider.getNodes();
public Collection<String> getLiveNodes() {
return clusterDataProvider.getLiveNodes();
}
@Override
@ -1041,7 +1040,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
" 'freedisk':918005641216}}}");
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoscaleJson));
Policy.Session session = policy.createSession(new ClusterDataProvider() {
Policy.Session session = policy.createSession(new DelegatingClusterDataProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return tagsMap.get(node);
@ -1053,7 +1052,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
@Override
public Collection<String> getNodes() {
public Collection<String> getLiveNodes() {
return replicaInfoMap.keySet();
}
@ -1099,7 +1098,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
"}");
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
ClusterDataProvider clusterDataProvider = getClusterDataProvider(nodeValues, clusterState);
ClusterDataProvider cdp = new ClusterDataProvider() {
ClusterDataProvider cdp = new DelegatingClusterDataProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return clusterDataProvider.getNodeValues(node, tags);
@ -1111,8 +1110,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
@Override
public Collection<String> getNodes() {
return clusterDataProvider.getNodes();
public Collection<String> getLiveNodes() {
return clusterDataProvider.getLiveNodes();
}
@Override
@ -1131,7 +1130,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
private ClusterDataProvider getClusterDataProvider(final Map<String, Map> nodeValues, String clusterState) {
return new ClusterDataProvider() {
return new DelegatingClusterDataProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
Map<String, Object> result = new LinkedHashMap<>();
@ -1140,7 +1139,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
@Override
public Collection<String> getNodes() {
public Collection<String> getLiveNodes() {
return nodeValues.keySet();
}
@ -1168,7 +1167,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
" '127.0.0.1:50096_solr':{" +
" 'cores':0," +
" 'port':'50096'}}");
ClusterDataProvider dataProvider = new ClusterDataProvider() {
ClusterDataProvider dataProvider = new DelegatingClusterDataProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
Map<String, Object> result = new LinkedHashMap<>();
@ -1187,7 +1186,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
@Override
public Collection<String> getNodes() {
public Collection<String> getLiveNodes() {
return Arrays.asList( "127.0.0.1:50097_solr", "127.0.0.1:50096_solr");
}
};
@ -1225,7 +1224,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
"node4:{cores:0, freedisk: 900, heap:16900, nodeRole:overseer, sysprop.rack:rack2}" +
"}");
ClusterDataProvider dataProvider = new ClusterDataProvider() {
ClusterDataProvider dataProvider = new DelegatingClusterDataProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
Map<String, Object> result = new LinkedHashMap<>();
@ -1244,7 +1243,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
@Override
public Collection<String> getNodes() {
public Collection<String> getLiveNodes() {
return Arrays.asList("node1", "node2", "node3", "node4");
}
};