mirror of
https://github.com/apache/lucene.git
synced 2025-03-07 00:39:21 +00:00
SOLR-6533 Added a testcase for config reload, hardened watching for changes
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1640857 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8a24e56be5
commit
c54d9a8f7f
@ -41,7 +41,6 @@ import java.util.Set;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
@ -1222,9 +1221,9 @@ public final class ZkController {
|
||||
overseerJobQueue.offer(ZkStateReader.toJSON(m));
|
||||
|
||||
if(configLocation != null) {
|
||||
synchronized (confDirectoryWatchers) {
|
||||
synchronized (confDirectoryListeners) {
|
||||
log.info("This conf directory is no more watched {0}",configLocation);
|
||||
confDirectoryWatchers.remove(configLocation);
|
||||
confDirectoryListeners.remove(configLocation);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2115,29 +2114,42 @@ public final class ZkController {
|
||||
*
|
||||
* @return true on success
|
||||
*/
|
||||
public static boolean persistConfigResourceToZooKeeper( SolrResourceLoader loader, int znodeVersion , String resourceName, byte[] content, boolean createIfNotExists) {
|
||||
public static boolean persistConfigResourceToZooKeeper( SolrResourceLoader loader, int znodeVersion ,
|
||||
String resourceName, byte[] content,
|
||||
boolean createIfNotExists) {
|
||||
final ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)loader;
|
||||
final ZkController zkController = zkLoader.getZkController();
|
||||
final SolrZkClient zkClient = zkController.getZkClient();
|
||||
final String resourceLocation = zkLoader.getConfigSetZkPath() + "/" + resourceName;
|
||||
String errMsg = "Failed to persist resource at {0} - version mismatch";
|
||||
String errMsg = "Failed to persist resource at {0} - version mismatch {1}";
|
||||
try {
|
||||
try {
|
||||
zkClient.setData(resourceLocation , content,znodeVersion, true);
|
||||
zkClient.setData(zkLoader.getConfigSetZkPath(),new byte[]{0},true);
|
||||
} catch (NoNodeException e) {
|
||||
if(createIfNotExists){
|
||||
try {
|
||||
zkClient.create(resourceLocation,content, CreateMode.PERSISTENT,true);
|
||||
zkClient.setData(zkLoader.getConfigSetZkPath(), new byte[]{0}, true);
|
||||
} catch (KeeperException.NodeExistsException nee) {
|
||||
log.info(MessageFormat.format(errMsg,resourceLocation));
|
||||
throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg,resourceLocation) + ", retry.");
|
||||
try {
|
||||
Stat stat = zkClient.exists(resourceLocation, null, true);
|
||||
log.info("failed to set data version in zk is {} and expected version is {} ", stat.getVersion(),znodeVersion);
|
||||
} catch (Exception e1) {
|
||||
log.warn("could not get stat");
|
||||
}
|
||||
|
||||
log.info(MessageFormat.format(errMsg,resourceLocation,znodeVersion));
|
||||
throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg,resourceLocation,znodeVersion) + ", retry.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (KeeperException.BadVersionException bve){
|
||||
log.info(MessageFormat.format(errMsg,resourceLocation));
|
||||
throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg,resourceLocation) + ", retry.");
|
||||
throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg,resourceLocation,znodeVersion) + ", retry.");
|
||||
}catch (ResourceModifiedInZkException e){
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt(); // Restore the interrupted status
|
||||
@ -2157,9 +2169,13 @@ public final class ZkController {
|
||||
|
||||
public void unRegisterConfListener(Runnable listener) {
|
||||
if(listener == null) return;
|
||||
synchronized (confDirectoryWatchers){
|
||||
for (Set<Runnable> runnables : confDirectoryWatchers.values()) {
|
||||
if(runnables != null) runnables.remove(listener);
|
||||
synchronized (confDirectoryListeners){
|
||||
for (Set<Runnable> listeners : confDirectoryListeners.values()) {
|
||||
if(listeners != null) {
|
||||
if(listeners.remove(listener)) {
|
||||
log.info(" a listener was removed because of core close");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -2172,9 +2188,9 @@ public final class ZkController {
|
||||
*/
|
||||
public void registerConfListenerForCore(String confDir,SolrCore core, final Runnable listener){
|
||||
if(listener==null) throw new NullPointerException("listener cannot be null");
|
||||
synchronized (confDirectoryWatchers){
|
||||
if(confDirectoryWatchers.containsKey(confDir)){
|
||||
confDirectoryWatchers.get(confDir).add(listener);
|
||||
synchronized (confDirectoryListeners){
|
||||
if(confDirectoryListeners.containsKey(confDir)){
|
||||
confDirectoryListeners.get(confDir).add(listener);
|
||||
core.addCloseHook(new CloseHook() {
|
||||
@Override
|
||||
public void preClose(SolrCore core) {
|
||||
@ -2184,69 +2200,76 @@ public final class ZkController {
|
||||
@Override
|
||||
public void postClose(SolrCore core) { }
|
||||
});
|
||||
|
||||
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,"This conf directory is not valid");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String , Set<Runnable>> confDirectoryWatchers = new HashMap<>();
|
||||
void watchZKConfDir(final String zkDir) {
|
||||
private final Map<String , Set<Runnable>> confDirectoryListeners = new HashMap<>();
|
||||
|
||||
if(!confDirectoryWatchers.containsKey(zkDir)){
|
||||
confDirectoryWatchers.put(zkDir,new HashSet<Runnable>());
|
||||
}else{
|
||||
//it's already watched
|
||||
return;
|
||||
}
|
||||
void watchZKConfDir(final String zkDir) {
|
||||
log.info("watch zkdir " + zkDir);
|
||||
if (!confDirectoryListeners.containsKey(zkDir)) {
|
||||
confDirectoryListeners.put(zkDir, new HashSet<Runnable>());
|
||||
setConfWatcher(zkDir, new WatcherImpl(zkDir));
|
||||
|
||||
Watcher watcher = new Watcher() {
|
||||
@Override
|
||||
public void process(WatchedEvent event) {
|
||||
try {
|
||||
synchronized (confDirectoryWatchers) {
|
||||
// if this is not among directories to be watched then don't set the watcher anymore
|
||||
if(!confDirectoryWatchers.containsKey(zkDir)) return;
|
||||
}
|
||||
}
|
||||
|
||||
if (event.getType() == Event.EventType.NodeChildrenChanged) {
|
||||
synchronized (confDirectoryWatchers) {
|
||||
final Set<Runnable> listeners = confDirectoryWatchers.get(zkDir);
|
||||
if (listeners != null) {
|
||||
new Thread() {
|
||||
@Override
|
||||
public synchronized void run() {
|
||||
//running in a separate thread so that the zk event thread is not
|
||||
// unnecessarily held up
|
||||
for (Runnable listener : listeners) listener.run();
|
||||
}
|
||||
}.start();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} finally {
|
||||
if (Event.EventType.None.equals(event.getType())) {
|
||||
}
|
||||
private class WatcherImpl implements Watcher{
|
||||
private final String zkDir ;
|
||||
|
||||
private WatcherImpl(String dir) {
|
||||
this.zkDir = dir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(WatchedEvent event) {
|
||||
try {
|
||||
|
||||
synchronized (confDirectoryListeners) {
|
||||
// if this is not among directories to be watched then don't set the watcher anymore
|
||||
if( !confDirectoryListeners.containsKey(zkDir)) {
|
||||
log.info("Watcher on {} is removed ", zkDir);
|
||||
return;
|
||||
} else {
|
||||
setConfWatcher(zkDir,this);
|
||||
}
|
||||
final Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
|
||||
if (listeners != null && !listeners.isEmpty()) {
|
||||
new Thread() {
|
||||
//run these in a separate thread because this can be long running
|
||||
public void run() {
|
||||
for (final Runnable listener : listeners)
|
||||
try {
|
||||
listener.run();
|
||||
} catch (Exception e) {
|
||||
log.warn("listener throws error", e);
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (Event.EventType.None.equals(event.getType())) {
|
||||
log.info("A node got unwatched for {}", zkDir);
|
||||
return;
|
||||
} else {
|
||||
setConfWatcher(zkDir,this);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
setConfWatcher(zkDir,watcher);
|
||||
}
|
||||
}
|
||||
|
||||
private void setConfWatcher(String zkDir, Watcher watcher) {
|
||||
try {
|
||||
zkClient.getChildren(zkDir,watcher,true);
|
||||
zkClient.exists(zkDir,watcher,true);
|
||||
} catch (KeeperException e) {
|
||||
log.error("failed to set watcher for conf dir {} ", zkDir);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.interrupted();
|
||||
Thread.currentThread().interrupt();
|
||||
log.error("failed to set watcher for conf dir {} ", zkDir);
|
||||
}
|
||||
}
|
||||
@ -2255,11 +2278,10 @@ public final class ZkController {
|
||||
return new OnReconnect() {
|
||||
@Override
|
||||
public void command() {
|
||||
synchronized (confDirectoryWatchers){
|
||||
for (String s : confDirectoryWatchers.keySet()) {
|
||||
synchronized (confDirectoryListeners){
|
||||
for (String s : confDirectoryListeners.keySet()) {
|
||||
watchZKConfDir(s);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -111,10 +111,6 @@ public class ZkSolrResourceLoader extends SolrResourceLoader {
|
||||
|
||||
}
|
||||
|
||||
public ZkByteArrayInputStream(byte[] buf, int offset, int length, Stat stat) {
|
||||
super(buf, offset, length);
|
||||
this.stat = stat;
|
||||
}
|
||||
public Stat getStat(){
|
||||
return stat;
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.solr.core;
|
||||
|
||||
import org.apache.lucene.util.Version;
|
||||
import org.apache.solr.cloud.ZkSolrResourceLoader;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.util.DOMUtil;
|
||||
import org.apache.solr.util.SystemIdResolver;
|
||||
@ -48,6 +49,7 @@ import javax.xml.xpath.XPathExpressionException;
|
||||
import javax.xml.xpath.XPathFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.text.ParseException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
@ -74,6 +76,7 @@ public class Config {
|
||||
private final String prefix;
|
||||
private final String name;
|
||||
private final SolrResourceLoader loader;
|
||||
private int zkVersion = -1;
|
||||
|
||||
/**
|
||||
* Builds a config from a resource name with no xpath prefix.
|
||||
@ -113,9 +116,14 @@ public class Config {
|
||||
this.prefix = (prefix != null && !prefix.endsWith("/"))? prefix + '/' : prefix;
|
||||
try {
|
||||
javax.xml.parsers.DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
|
||||
|
||||
|
||||
if (is == null) {
|
||||
is = new InputSource(loader.openConfig(name));
|
||||
InputStream in = loader.openConfig(name);
|
||||
if (in instanceof ZkSolrResourceLoader.ZkByteArrayInputStream) {
|
||||
zkVersion = ((ZkSolrResourceLoader.ZkByteArrayInputStream) in).getStat().getVersion();
|
||||
log.info("loaded config {} with version {} ",name,zkVersion);
|
||||
}
|
||||
is = new InputSource(in);
|
||||
is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(name));
|
||||
}
|
||||
|
||||
@ -464,6 +472,12 @@ public class Config {
|
||||
return version;
|
||||
}
|
||||
|
||||
/**If this config is loaded from zk the version is relevant other wise -1 is returned
|
||||
*/
|
||||
public int getZnodeVersion(){
|
||||
return zkVersion;
|
||||
}
|
||||
|
||||
public Config getOriginalConfig() {
|
||||
return new Config(loader, null, origDoc);
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ import org.noggit.JSONParser;
|
||||
import org.noggit.JSONWriter;
|
||||
import org.noggit.ObjectBuilder;
|
||||
|
||||
public class ConfigOverlay {
|
||||
public class ConfigOverlay implements MapSerializable{
|
||||
private final int znodeVersion ;
|
||||
private Map<String, Object> data;
|
||||
private Map<String,Object> props;
|
||||
@ -177,13 +177,6 @@ public class ConfigOverlay {
|
||||
return out.toString();
|
||||
}
|
||||
|
||||
public Map toOutputFormat() {
|
||||
Map result = new LinkedHashMap();
|
||||
result.put("version",znodeVersion);
|
||||
result.putAll(data);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
public static final String RESOURCE_NAME = "configoverlay.json";
|
||||
|
||||
@ -254,4 +247,12 @@ public class ConfigOverlay {
|
||||
public Map<String, Object> getUserProps() {
|
||||
return userProps;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> toMap() {
|
||||
Map result = new LinkedHashMap();
|
||||
result.put("znodeVersion",znodeVersion);
|
||||
result.putAll(data);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -328,7 +328,7 @@ public class SolrConfig extends Config implements MapSerializable{
|
||||
}
|
||||
}
|
||||
|
||||
private static ConfigOverlay getConfigOverlay(SolrResourceLoader loader) {
|
||||
public static ConfigOverlay getConfigOverlay(SolrResourceLoader loader) {
|
||||
InputStream in = null;
|
||||
try {
|
||||
in = loader.openResource(ConfigOverlay.RESOURCE_NAME);
|
||||
@ -712,6 +712,7 @@ public class SolrConfig extends Config implements MapSerializable{
|
||||
@Override
|
||||
public Map<String, Object> toMap() {
|
||||
LinkedHashMap result = new LinkedHashMap();
|
||||
if(getZnodeVersion() > -1) result.put("znodeVersion",getZnodeVersion());
|
||||
result.put("luceneMatchVersion",luceneMatchVersion);
|
||||
result.put("updateHandler", getUpdateHandlerInfo().toMap());
|
||||
Map m = new LinkedHashMap();
|
||||
@ -780,6 +781,7 @@ public class SolrConfig extends Config implements MapSerializable{
|
||||
public ConfigOverlay getOverlay() {
|
||||
if(overlay ==null) {
|
||||
overlay = getConfigOverlay(getResourceLoader());
|
||||
log.info("$$$overlay_version "+ overlay.getZnodeVersion());
|
||||
}
|
||||
return overlay;
|
||||
}
|
||||
|
@ -19,8 +19,7 @@ package org.apache.solr.handler;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.text.MessageFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@ -28,30 +27,23 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.lucene.analysis.util.ResourceLoader;
|
||||
import org.apache.lucene.analysis.util.ResourceLoaderAware;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.cloud.ZkSolrResourceLoader;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.MapSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.ContentStream;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.core.CloseHook;
|
||||
import org.apache.solr.core.ConfigOverlay;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.PluginInfo;
|
||||
import org.apache.solr.core.SolrConfig;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.SolrInfoMBean;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.request.SolrRequestHandler;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
@ -94,30 +86,60 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
|
||||
public void inform(final SolrCore core) {
|
||||
if( ! (core.getResourceLoader() instanceof ZkSolrResourceLoader)) return;
|
||||
final ZkSolrResourceLoader zkSolrResourceLoader = (ZkSolrResourceLoader) core.getResourceLoader();
|
||||
if(zkSolrResourceLoader != null){
|
||||
Runnable listener = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if(core.isClosed()) return;
|
||||
Stat stat = zkSolrResourceLoader.getZkController().getZkClient().exists((zkSolrResourceLoader).getConfigSetZkPath() + "/" + ConfigOverlay.RESOURCE_NAME, null, true);
|
||||
if(stat == null) return;
|
||||
if (stat.getVersion() > core.getSolrConfig().getOverlay().getZnodeVersion()) {
|
||||
core.getCoreDescriptor().getCoreContainer().reload(core.getName());
|
||||
if(zkSolrResourceLoader != null)
|
||||
zkSolrResourceLoader.getZkController().registerConfListenerForCore(
|
||||
zkSolrResourceLoader.getConfigSetZkPath(),
|
||||
core,
|
||||
getListener(core, zkSolrResourceLoader));
|
||||
|
||||
}
|
||||
|
||||
private static Runnable getListener(SolrCore core, ZkSolrResourceLoader zkSolrResourceLoader) {
|
||||
final String coreName = core.getName();
|
||||
final CoreContainer cc = core.getCoreDescriptor().getCoreContainer();
|
||||
final String overlayPath = (zkSolrResourceLoader).getConfigSetZkPath() + "/" + ConfigOverlay.RESOURCE_NAME;
|
||||
final String solrConfigPath = (zkSolrResourceLoader).getConfigSetZkPath() + "/" + core.getSolrConfig().getName();
|
||||
return new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
log.info("config update_listener called");
|
||||
SolrZkClient zkClient = cc.getZkController().getZkClient();
|
||||
int solrConfigversion,overlayVersion;
|
||||
try (SolrCore core = cc.getCore(coreName)) {
|
||||
if (core.isClosed()) return;
|
||||
solrConfigversion = core.getSolrConfig().getOverlay().getZnodeVersion();
|
||||
overlayVersion = core.getSolrConfig().getZnodeVersion();
|
||||
}
|
||||
|
||||
if (checkStale(zkClient, overlayPath, solrConfigversion) ||
|
||||
checkStale(zkClient, solrConfigPath, overlayVersion)) {
|
||||
log.info("core reload");
|
||||
cc.reload(coreName);
|
||||
}
|
||||
} catch (KeeperException.NoNodeException nne){
|
||||
//no problem
|
||||
} catch (KeeperException e) {
|
||||
log.error("error refreshing solrconfig ", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().isInterrupted();
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
zkSolrResourceLoader.getZkController().registerConfListenerForCore(zkSolrResourceLoader.getConfigSetZkPath(), core,listener);
|
||||
private static boolean checkStale(SolrZkClient zkClient, String zkPath, int currentVersion) {
|
||||
try {
|
||||
Stat stat = zkClient.exists(zkPath, null, true);
|
||||
if(stat == null){
|
||||
if(currentVersion>0) return true;
|
||||
return false;
|
||||
}
|
||||
if (stat.getVersion() > currentVersion) {
|
||||
log.info(zkPath+" is stale will need an update from {} to {}", currentVersion,stat.getVersion());
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
} catch (KeeperException.NoNodeException nne){
|
||||
//no problem
|
||||
} catch (KeeperException e) {
|
||||
log.error("error refreshing solrconfig ", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().isInterrupted();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@ -136,8 +158,7 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
|
||||
String path = (String) req.getContext().get("path");
|
||||
if(path == null) path="/config";
|
||||
if("/config/overlay".equals(path)){
|
||||
resp.add("overlay", req.getCore().getSolrConfig().getOverlay().toOutputFormat());
|
||||
return;
|
||||
resp.add("overlay", req.getCore().getSolrConfig().getOverlay().toMap());
|
||||
} else {
|
||||
List<String> parts =StrUtils.splitSmart(path, '/');
|
||||
if(parts.get(0).isEmpty()) parts.remove(0);
|
||||
@ -152,13 +173,32 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
|
||||
|
||||
|
||||
private void handlePOST() throws IOException {
|
||||
Iterable<ContentStream> streams = req.getContentStreams();
|
||||
if(streams == null ){
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream");
|
||||
}
|
||||
Iterable<ContentStream> streams = req.getContentStreams();
|
||||
if (streams == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream");
|
||||
}
|
||||
ArrayList<CommandOperation> ops = new ArrayList<>();
|
||||
|
||||
for (ContentStream stream : streams)
|
||||
ops.addAll(CommandOperation.parse(stream.getReader()));
|
||||
List<Map> errList = CommandOperation.captureErrors(ops);
|
||||
if(!errList.isEmpty()) {
|
||||
resp.add(CommandOperation.ERR_MSGS,errList);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
for (ContentStream stream : streams) {
|
||||
runCommandsTillSuccess(stream);
|
||||
for (;;) {
|
||||
ArrayList<CommandOperation> opsCopy = new ArrayList<>(ops.size());
|
||||
ConfigOverlay overlay = SolrConfig.getConfigOverlay(req.getCore().getResourceLoader());
|
||||
for (CommandOperation op : ops) opsCopy.add(op.getCopy());
|
||||
try {
|
||||
handleCommands(opsCopy, overlay);
|
||||
break;
|
||||
} catch (ZkController.ResourceModifiedInZkException e) {
|
||||
//retry
|
||||
log.info("Race condition, the node is modified in ZK by someone else " +e.getMessage());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
resp.setException(e);
|
||||
@ -167,30 +207,21 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
|
||||
|
||||
}
|
||||
|
||||
private void runCommandsTillSuccess(ContentStream stream) throws IOException {
|
||||
for (;;) {
|
||||
try {
|
||||
handleCommands(stream);
|
||||
break;
|
||||
} catch (ZkController.ResourceModifiedInZkException e) {
|
||||
log.info(e.getMessage());
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleCommands( ContentStream stream) throws IOException {
|
||||
ConfigOverlay overlay = req.getCore().getSolrConfig().getOverlay();
|
||||
List<CommandOperation> ops = CommandOperation.parse(stream.getReader());
|
||||
private void handleCommands(List<CommandOperation> ops, ConfigOverlay overlay ) throws IOException {
|
||||
for (CommandOperation op : ops) {
|
||||
if(SET_PROPERTY.equals( op.name) ){
|
||||
overlay = applySetProp(op, overlay);
|
||||
}else if(UNSET_PROPERTY.equals(op.name)){
|
||||
overlay = applyUnset(op,overlay);
|
||||
}else if(SET_USER_PROPERTY.equals(op.name)){
|
||||
overlay = applySetUserProp(op ,overlay);
|
||||
}else if(UNSET_USER_PROPERTY.equals(op.name)){
|
||||
overlay = applyUnsetUserProp(op, overlay);
|
||||
switch (op.name) {
|
||||
case SET_PROPERTY:
|
||||
overlay = applySetProp(op, overlay);
|
||||
break;
|
||||
case UNSET_PROPERTY:
|
||||
overlay = applyUnset(op, overlay);
|
||||
break;
|
||||
case SET_USER_PROPERTY:
|
||||
overlay = applySetUserProp(op, overlay);
|
||||
break;
|
||||
case UNSET_USER_PROPERTY:
|
||||
overlay = applyUnsetUserProp(op, overlay);
|
||||
break;
|
||||
}
|
||||
}
|
||||
List errs = CommandOperation.captureErrors(ops);
|
||||
@ -204,21 +235,6 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
|
||||
ZkController.persistConfigResourceToZooKeeper(loader,overlay.getZnodeVersion(),
|
||||
ConfigOverlay.RESOURCE_NAME,overlay.toByteArray(),true);
|
||||
|
||||
String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
||||
Map map = ZkNodeProps.makeMap(CoreAdminParams.ACTION, CollectionParams.CollectionAction.RELOAD.toString() ,
|
||||
CollectionParams.NAME, collectionName);
|
||||
|
||||
SolrQueryRequest solrQueryRequest = new LocalSolrQueryRequest(req.getCore(), new MapSolrParams(map));
|
||||
SolrQueryResponse tmpResp = new SolrQueryResponse();
|
||||
try {
|
||||
//doing a collection reload
|
||||
req.getCore().getCoreDescriptor().getCoreContainer().getCollectionsHandler().handleRequestBody(solrQueryRequest,tmpResp);
|
||||
} catch (Exception e) {
|
||||
String msg = MessageFormat.format("Unable to reload collection {0}", collectionName);
|
||||
log.error(msg);
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg);
|
||||
}
|
||||
|
||||
} else {
|
||||
SolrResourceLoader.persistConfLocally(loader, ConfigOverlay.RESOURCE_NAME, overlay.toByteArray());
|
||||
req.getCore().getCoreDescriptor().getCoreContainer().reload(req.getCore().getName());
|
||||
|
@ -210,5 +210,8 @@ public class CommandOperation {
|
||||
}
|
||||
|
||||
}
|
||||
public CommandOperation getCopy(){
|
||||
return new CommandOperation(name,commandData);
|
||||
}
|
||||
|
||||
}
|
||||
|
142
solr/core/src/test/org/apache/solr/handler/TestConfigReload.java
Normal file
142
solr/core/src/test/org/apache/solr/handler/TestConfigReload.java
Normal file
@ -0,0 +1,142 @@
|
||||
package org.apache.solr.handler;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.StringReader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.MessageFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.solr.client.solrj.SolrServer;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
||||
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.core.SolrConfig;
|
||||
import org.apache.solr.util.RESTfulServerProvider;
|
||||
import org.apache.solr.util.RestTestHarness;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.noggit.JSONParser;
|
||||
import org.noggit.ObjectBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.apache.solr.core.ConfigOverlay.getObjectByPath;
|
||||
|
||||
public class TestConfigReload extends AbstractFullDistribZkTestBase {
|
||||
|
||||
|
||||
static final Logger log = LoggerFactory.getLogger(TestConfigReload.class);
|
||||
private List<RestTestHarness> restTestHarnesses = new ArrayList<>();
|
||||
|
||||
private void setupHarnesses() {
|
||||
for (final SolrServer client : clients) {
|
||||
RestTestHarness harness = new RestTestHarness(new RESTfulServerProvider() {
|
||||
@Override
|
||||
public String getBaseURL() {
|
||||
return ((HttpSolrServer)client).getBaseURL();
|
||||
}
|
||||
});
|
||||
restTestHarnesses.add(harness);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doTest() throws Exception {
|
||||
setupHarnesses();
|
||||
reloadTest();
|
||||
}
|
||||
|
||||
private void reloadTest() throws Exception {
|
||||
SolrZkClient client = cloudClient.getZkStateReader().getZkClient();
|
||||
log.info("live_nodes_count : " + cloudClient.getZkStateReader().getClusterState().getLiveNodes());
|
||||
String confPath = ZkController.CONFIGS_ZKNODE+"/conf1/";
|
||||
// checkConfReload(client, confPath + ConfigOverlay.RESOURCE_NAME, "overlay");
|
||||
checkConfReload(client, confPath + SolrConfig.DEFAULT_CONF_FILE,"solrConfig", "/config");
|
||||
|
||||
}
|
||||
|
||||
private void checkConfReload(SolrZkClient client, String resPath, String name, String uri) throws Exception {
|
||||
Stat stat = new Stat();
|
||||
byte[] data = null;
|
||||
try {
|
||||
data = client.getData(resPath, null, stat, true);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
data = "{}".getBytes(StandardCharsets.UTF_8);
|
||||
log.info("creating_node {}",resPath);
|
||||
client.create(resPath,data, CreateMode.PERSISTENT,true);
|
||||
}
|
||||
long startTime = System.nanoTime();
|
||||
Stat newStat = client.setData(resPath, data, true);
|
||||
client.setData("/configs/conf1", new byte[]{1}, true);
|
||||
assertTrue(newStat.getVersion() > stat.getVersion());
|
||||
log.info("new_version "+ newStat.getVersion());
|
||||
Integer newVersion = newStat.getVersion();
|
||||
long maxTimeoutSeconds = 20;
|
||||
DocCollection coll = cloudClient.getZkStateReader().getClusterState().getCollection("collection1");
|
||||
List<String> urls = new ArrayList<>();
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
for (Replica replica : slice.getReplicas())
|
||||
urls.add(""+replica.get(ZkStateReader.BASE_URL_PROP) + "/"+replica.get(ZkStateReader.CORE_NAME_PROP));
|
||||
}
|
||||
HashSet<String> succeeded = new HashSet<>();
|
||||
|
||||
while ( TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutSeconds){
|
||||
Thread.sleep(50);
|
||||
for (String url : urls) {
|
||||
Map respMap = getAsMap(url+uri+"?wt=json");
|
||||
if(String.valueOf(newVersion).equals(String.valueOf( getObjectByPath(respMap, true, asList(name, "znodeVersion"))))){
|
||||
succeeded.add(url);
|
||||
}
|
||||
}
|
||||
if(succeeded.size() == urls.size()) break;
|
||||
succeeded.clear();
|
||||
}
|
||||
assertEquals(MessageFormat.format("tried these servers {0} succeeded only in {1} ", urls,succeeded) , urls.size(), succeeded.size());
|
||||
}
|
||||
|
||||
private Map getAsMap(String uri) throws Exception {
|
||||
HttpGet get = new HttpGet(uri) ;
|
||||
HttpEntity entity = null;
|
||||
try {
|
||||
entity = cloudClient.getLbServer().getHttpClient().execute(get).getEntity();
|
||||
String response = EntityUtils.toString(entity, StandardCharsets.UTF_8);
|
||||
return (Map) ObjectBuilder.getVal(new JSONParser(new StringReader(response)));
|
||||
} finally {
|
||||
EntityUtils.consumeQuietly(entity);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
@ -67,7 +67,6 @@ public class TestSolrConfigHandlerConcurrent extends AbstractFullDistribZkTestBa
|
||||
|
||||
@Override
|
||||
public void doTest() throws Exception {
|
||||
|
||||
Map editable_prop_map = (Map) new ObjectBuilder(new JSONParser(new StringReader(
|
||||
ConfigOverlay.MAPPING))).getObject();
|
||||
Map caches = (Map) editable_prop_map.get("query");
|
||||
@ -147,7 +146,7 @@ public class TestSolrConfigHandlerConcurrent extends AbstractFullDistribZkTestBa
|
||||
RestTestHarness harness = restTestHarnesses.get(r.nextInt(restTestHarnesses.size()));
|
||||
long startTime = System.nanoTime();
|
||||
boolean success = false;
|
||||
long maxTimeoutSeconds = 10;
|
||||
long maxTimeoutSeconds = 20;
|
||||
while ( TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutSeconds) {
|
||||
Thread.sleep(100);
|
||||
errmessages.clear();
|
||||
@ -155,19 +154,19 @@ public class TestSolrConfigHandlerConcurrent extends AbstractFullDistribZkTestBa
|
||||
Map m = (Map) respMap.get("overlay");
|
||||
if(m!= null) m = (Map) m.get("props");
|
||||
if(m == null) {
|
||||
errmessages.add(MessageFormat.format( "overlay does not exist for cache: {} , iteration: {} response {} ", cacheName, i, respMap.toString()));
|
||||
errmessages.add(MessageFormat.format( "overlay does not exist for cache: {0} , iteration: {1} response {2} ", cacheName, i, respMap.toString()));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
Object o = getObjectByPath(m, true, asList("query", cacheName, "size"));
|
||||
if(!val1.equals(o)) errmessages.add(MessageFormat.format("'size' property not set, expected = {}, actual {}", val1,o));
|
||||
if(!val1.equals(o)) errmessages.add(MessageFormat.format("'size' property not set, expected = {0}, actual {1}", val1,o));
|
||||
|
||||
o = getObjectByPath(m, true, asList("query", cacheName, "initialSize"));
|
||||
if(!val2.equals(o)) errmessages.add(MessageFormat.format("'initialSize' property not set, expected = {}, actual {}", val2,o));
|
||||
if(!val2.equals(o)) errmessages.add(MessageFormat.format("'initialSize' property not set, expected = {0}, actual {1}", val2,o));
|
||||
|
||||
o = getObjectByPath(m, true, asList("query", cacheName, "autowarmCount"));
|
||||
if(!val3.equals(o)) errmessages.add(MessageFormat.format("'autowarmCount' property not set, expected = {}, actual {}", val3,o));
|
||||
if(!val3.equals(o)) errmessages.add(MessageFormat.format("'autowarmCount' property not set, expected = {0}, actual {1}", val3,o));
|
||||
if(errmessages.isEmpty()) break;
|
||||
}
|
||||
if(!errmessages.isEmpty()) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user