mirror of https://github.com/apache/lucene.git
SOLR-10397: Create auto add replicas trigger directly in ZK instead of using API
This commit is contained in:
parent
cda1d2ee1a
commit
bd08ca0918
|
@ -30,8 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.cloud.autoscaling.AutoScaling;
|
||||
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
|
||||
import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
|
||||
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
||||
import org.apache.solr.cloud.overseer.CollectionMutator;
|
||||
|
@ -41,22 +39,17 @@ import org.apache.solr.cloud.overseer.ReplicaMutator;
|
|||
import org.apache.solr.cloud.overseer.SliceMutator;
|
||||
import org.apache.solr.cloud.overseer.ZkStateWriter;
|
||||
import org.apache.solr.cloud.overseer.ZkWriteCommand;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.ContentStreamBase;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
import org.apache.solr.common.util.ObjectReleaseTracker;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CloudConfig;
|
||||
import org.apache.solr.handler.admin.CollectionsHandler;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.update.UpdateShardHandler;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -532,11 +525,6 @@ public class Overseer implements Closeable {
|
|||
ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
|
||||
ccThread.setDaemon(true);
|
||||
|
||||
//TODO nocommit, autoscaling framework should start autoAddReplicas trigger automatically (implicitly)
|
||||
autoscalingTriggerCreator = new Thread(createAutoscalingTriggerIfNotExist(), "AutoscalingTriggerCreator");
|
||||
autoscalingTriggerCreator.setDaemon(true);
|
||||
autoscalingTriggerCreator.start();
|
||||
|
||||
ThreadGroup triggerThreadGroup = new ThreadGroup("Overseer autoscaling triggers");
|
||||
OverseerTriggerThread trigger = new OverseerTriggerThread(zkController);
|
||||
triggerThread = new OverseerThread(triggerThreadGroup, trigger, "OverseerAutoScalingTriggerThread-" + id);
|
||||
|
@ -574,50 +562,6 @@ public class Overseer implements Closeable {
|
|||
assert ObjectReleaseTracker.release(this);
|
||||
}
|
||||
|
||||
private Runnable createAutoscalingTriggerIfNotExist() {
|
||||
return new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
boolean triggerExist = getZkStateReader().getAutoScalingConfig()
|
||||
.getTriggerConfigs().get(".auto_add_replicas") != null;
|
||||
if (triggerExist) return;
|
||||
} catch (InterruptedException | KeeperException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Failed when creating .auto_add_replicas trigger");
|
||||
}
|
||||
try {
|
||||
while (getZkController().getCoreContainer().getRequestHandler(AutoScalingHandler.HANDLER_PATH) == null) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (getZkController().getCoreContainer().isShutDown()) {
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {}
|
||||
|
||||
String dsl = AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_DSL.replace("{{waitFor}}",
|
||||
String.valueOf(config.getAutoReplicaFailoverWaitAfterExpiration()/1000));
|
||||
LocalSolrQueryRequest request = new LocalSolrQueryRequest(null, new ModifiableSolrParams());
|
||||
request.getContext().put("httpMethod", "POST");
|
||||
request.setContentStreams(Collections.singleton(new ContentStreamBase.StringStream(dsl)));
|
||||
SolrQueryResponse response = new SolrQueryResponse();
|
||||
try {
|
||||
getZkController().getCoreContainer()
|
||||
.getRequestHandler(AutoScalingHandler.HANDLER_PATH).handleRequest(request, response);
|
||||
if (!"success".equals(response.getValues().get("result"))) {
|
||||
log.error("Failed when creating .auto_add_replicas trigger, return {}",response);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed when creating .auto_add_replicas trigger ", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void doClose() {
|
||||
|
||||
if (updaterThread != null) {
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Map;
|
|||
import com.google.common.base.Preconditions;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
|
||||
public class AutoScaling {
|
||||
|
@ -144,11 +145,10 @@ public class AutoScaling {
|
|||
}
|
||||
|
||||
public static final String AUTO_ADD_REPLICAS_TRIGGER_DSL =
|
||||
"{" +
|
||||
" 'set-trigger' : {" +
|
||||
" {" +
|
||||
" 'name' : '.auto_add_replicas'," +
|
||||
" 'event' : 'nodeLost'," +
|
||||
" 'waitFor' : '{{waitFor}}s'," +
|
||||
" 'waitFor' : -1," +
|
||||
" 'enabled' : true," +
|
||||
" 'actions' : [" +
|
||||
" {" +
|
||||
|
@ -160,6 +160,7 @@ public class AutoScaling {
|
|||
" 'class':'solr.ExecutePlanAction'" +
|
||||
" }" +
|
||||
" ]" +
|
||||
" }" +
|
||||
"}";
|
||||
" }";
|
||||
|
||||
public static final Map<String, Object> AUTO_ADD_REPLICAS_TRIGGER_PROPS = (Map) Utils.fromJSONString(AUTO_ADD_REPLICAS_TRIGGER_DSL);
|
||||
}
|
||||
|
|
|
@ -542,7 +542,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
|
|||
private static String fullName = SystemLogListener.class.getName();
|
||||
private static String solrName = "solr." + SystemLogListener.class.getSimpleName();
|
||||
|
||||
private static AutoScalingConfig withSystemLogListener(AutoScalingConfig autoScalingConfig, String triggerName) {
|
||||
static AutoScalingConfig withSystemLogListener(AutoScalingConfig autoScalingConfig, String triggerName) {
|
||||
Map<String, AutoScalingConfig.TriggerListenerConfig> configs = autoScalingConfig.getTriggerListenerConfigs();
|
||||
for (AutoScalingConfig.TriggerListenerConfig cfg : configs.values()) {
|
||||
if (triggerName.equals(cfg.trigger)) {
|
||||
|
|
|
@ -37,13 +37,16 @@ import org.apache.solr.common.cloud.SolrZkClient;
|
|||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.cloud.ZooKeeperException;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CloudConfig;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||
|
||||
/**
|
||||
* Overseer thread responsible for reading triggers from zookeeper and
|
||||
* adding/removing them from {@link ScheduledTriggers}
|
||||
|
@ -104,6 +107,27 @@ public class OverseerTriggerThread implements Runnable, Closeable {
|
|||
public void run() {
|
||||
int lastZnodeVersion = znodeVersion;
|
||||
|
||||
// we automatically add a trigger for auto add replicas if it does not exists already
|
||||
while (true) {
|
||||
try {
|
||||
AutoScalingConfig autoScalingConfig = zkStateReader.getAutoScalingConfig();
|
||||
AutoScalingConfig withAutoAddReplicasTrigger = withAutoAddReplicasTrigger(autoScalingConfig);
|
||||
if (withAutoAddReplicasTrigger.equals(autoScalingConfig)) break;
|
||||
log.debug("Adding .autoAddReplicas trigger");
|
||||
zkClient.setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(withAutoAddReplicasTrigger), withAutoAddReplicasTrigger.getZkVersion(), true);
|
||||
break;
|
||||
} catch (KeeperException.BadVersionException bve) {
|
||||
// somebody else has changed the configuration so we must retry
|
||||
} catch (InterruptedException e) {
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
log.warn("Interrupted", e);
|
||||
} catch (KeeperException e) {
|
||||
log.error("A ZK error has occurred", e);
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
refreshAutoScalingConf(new AutoScalingWatcher());
|
||||
} catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
|
||||
|
@ -302,6 +326,26 @@ public class OverseerTriggerThread implements Runnable, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private AutoScalingConfig withAutoAddReplicasTrigger(AutoScalingConfig autoScalingConfig) {
|
||||
CloudConfig cloudConfig = zkController.getCoreContainer().getConfig().getCloudConfig();
|
||||
Map<String, Object> triggerProps = AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_PROPS;
|
||||
String triggerName = (String) triggerProps.get("name");
|
||||
Map<String, AutoScalingConfig.TriggerConfig> configs = autoScalingConfig.getTriggerConfigs();
|
||||
for (AutoScalingConfig.TriggerConfig cfg : configs.values()) {
|
||||
if (triggerName.equals(cfg.name)) {
|
||||
// already has this trigger
|
||||
return autoScalingConfig;
|
||||
}
|
||||
}
|
||||
// need to add
|
||||
triggerProps.computeIfPresent("waitFor", (k, v) -> (long) (cloudConfig.getAutoReplicaFailoverWaitAfterExpiration() / 1000));
|
||||
AutoScalingConfig.TriggerConfig config = new AutoScalingConfig.TriggerConfig(triggerName, triggerProps);
|
||||
autoScalingConfig = autoScalingConfig.withTriggerConfig(config);
|
||||
// need to add SystemLogListener explicitly here
|
||||
autoScalingConfig = AutoScalingHandler.withSystemLogListener(autoScalingConfig, triggerName);
|
||||
return autoScalingConfig;
|
||||
}
|
||||
|
||||
private static Map<String, AutoScaling.Trigger> loadTriggers(AutoScaling.TriggerFactory triggerFactory, AutoScalingConfig autoScalingConfig) {
|
||||
Map<String, AutoScalingConfig.TriggerConfig> triggers = autoScalingConfig.getTriggerConfigs();
|
||||
if (triggers == null) {
|
||||
|
|
|
@ -36,12 +36,14 @@ import org.apache.solr.common.params.CollectionParams;
|
|||
import org.apache.solr.common.params.MapSolrParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
|
||||
public class AutoAddReplicasIntegrationTest extends SolrCloudTestCase {
|
||||
private static final String COLLECTION1 = "testSimple1";
|
||||
private static final String COLLECTION2 = "testSimple2";
|
||||
|
|
Loading…
Reference in New Issue