mirror of https://github.com/apache/lucene.git
SOLR-10397: Pass CloudConfig from Overseer to the OverseerTriggerThread. Avoid throwing exceptions on session expiry if thread has been closed already. Abort earlier if thread has been closed.
This commit is contained in:
parent
bd08ca0918
commit
61fe5248b0
|
@ -473,8 +473,6 @@ public class Overseer implements Closeable {
|
|||
|
||||
private OverseerThread triggerThread;
|
||||
|
||||
private Thread autoscalingTriggerCreator;
|
||||
|
||||
private final ZkStateReader reader;
|
||||
|
||||
private final ShardHandler shardHandler;
|
||||
|
@ -526,7 +524,7 @@ public class Overseer implements Closeable {
|
|||
ccThread.setDaemon(true);
|
||||
|
||||
ThreadGroup triggerThreadGroup = new ThreadGroup("Overseer autoscaling triggers");
|
||||
OverseerTriggerThread trigger = new OverseerTriggerThread(zkController);
|
||||
OverseerTriggerThread trigger = new OverseerTriggerThread(zkController, config);
|
||||
triggerThread = new OverseerThread(triggerThreadGroup, trigger, "OverseerAutoScalingTriggerThread-" + id);
|
||||
|
||||
updaterThread.start();
|
||||
|
@ -576,10 +574,6 @@ public class Overseer implements Closeable {
|
|||
IOUtils.closeQuietly(triggerThread);
|
||||
triggerThread.interrupt();
|
||||
}
|
||||
if (autoscalingTriggerCreator != null) {
|
||||
autoscalingTriggerCreator.interrupt();
|
||||
}
|
||||
|
||||
if (updaterThread != null) {
|
||||
try {
|
||||
updaterThread.join();
|
||||
|
@ -595,17 +589,9 @@ public class Overseer implements Closeable {
|
|||
triggerThread.join();
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
if (autoscalingTriggerCreator != null) {
|
||||
try {
|
||||
log.info("Waiting for autoscaling trigger creator join");
|
||||
autoscalingTriggerCreator.join();
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
|
||||
updaterThread = null;
|
||||
ccThread = null;
|
||||
triggerThread = null;
|
||||
autoscalingTriggerCreator = null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -57,6 +57,8 @@ public class OverseerTriggerThread implements Runnable, Closeable {
|
|||
|
||||
private final ZkController zkController;
|
||||
|
||||
private final CloudConfig cloudConfig;
|
||||
|
||||
private final ZkStateReader zkStateReader;
|
||||
|
||||
private final SolrZkClient zkClient;
|
||||
|
@ -80,8 +82,9 @@ public class OverseerTriggerThread implements Runnable, Closeable {
|
|||
|
||||
private AutoScalingConfig autoScalingConfig;
|
||||
|
||||
public OverseerTriggerThread(ZkController zkController) {
|
||||
public OverseerTriggerThread(ZkController zkController, CloudConfig cloudConfig) {
|
||||
this.zkController = zkController;
|
||||
this.cloudConfig = cloudConfig;
|
||||
zkStateReader = zkController.getZkStateReader();
|
||||
zkClient = zkController.getZkClient();
|
||||
scheduledTriggers = new ScheduledTriggers(zkController);
|
||||
|
@ -108,7 +111,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
|
|||
int lastZnodeVersion = znodeVersion;
|
||||
|
||||
// we automatically add a trigger for auto add replicas if it does not exists already
|
||||
while (true) {
|
||||
while (!isClosed) {
|
||||
try {
|
||||
AutoScalingConfig autoScalingConfig = zkStateReader.getAutoScalingConfig();
|
||||
AutoScalingConfig withAutoAddReplicasTrigger = withAutoAddReplicasTrigger(autoScalingConfig);
|
||||
|
@ -124,17 +127,22 @@ public class OverseerTriggerThread implements Runnable, Closeable {
|
|||
log.warn("Interrupted", e);
|
||||
} catch (KeeperException e) {
|
||||
log.error("A ZK error has occurred", e);
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (!isClosed || Thread.currentThread().isInterrupted()) return;
|
||||
|
||||
try {
|
||||
refreshAutoScalingConf(new AutoScalingWatcher());
|
||||
} catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
|
||||
log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
|
||||
} catch (KeeperException e) {
|
||||
log.error("A ZK error has occurred", e);
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
|
||||
if (!isClosed) {
|
||||
// throw exception only if we haven't been closed already
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
|
||||
}
|
||||
return; // silently!
|
||||
} catch (InterruptedException e) {
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -327,7 +335,6 @@ public class OverseerTriggerThread implements Runnable, Closeable {
|
|||
}
|
||||
|
||||
private AutoScalingConfig withAutoAddReplicasTrigger(AutoScalingConfig autoScalingConfig) {
|
||||
CloudConfig cloudConfig = zkController.getCoreContainer().getConfig().getCloudConfig();
|
||||
Map<String, Object> triggerProps = AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_PROPS;
|
||||
String triggerName = (String) triggerProps.get("name");
|
||||
Map<String, AutoScalingConfig.TriggerConfig> configs = autoScalingConfig.getTriggerConfigs();
|
||||
|
|
Loading…
Reference in New Issue