From 72e8c75c291134bfacb9ee8b18a365a69eed44d6 Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Tue, 31 Mar 2015 16:18:18 +0000 Subject: [PATCH] SOLR-6924: The config API forcefully refreshes all replicas in the collection to ensure all are updated git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1670381 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 + .../org/apache/solr/cloud/ZkController.java | 9 +- .../org/apache/solr/core/ConfigOverlay.java | 4 +- .../org/apache/solr/core/RequestParams.java | 1 + .../java/org/apache/solr/core/SolrConfig.java | 3 +- .../java/org/apache/solr/core/SolrCore.java | 14 +- .../solr/handler/SolrConfigHandler.java | 276 +++++++++++++++++- .../solr/core/TestSolrConfigHandler.java | 1 - .../apache/solr/handler/TestReqParamsAPI.java | 1 - .../org/apache/solr/common/cloud/Replica.java | 5 + .../solr/common/params/CommonParams.java | 4 + 11 files changed, 293 insertions(+), 28 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 59401d570ef..dd265b81e1a 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -344,6 +344,9 @@ Bug Fixes * SOLR-7309: Make bin/solr, bin/post work when Solr installation directory contains spaces (Ramkumar Aiyengar, Martijn Koster) +* SOLR-6924: The config API forcefully refreshes all replicas in the collection to ensure all are + updated (Noble Paul) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 15f1118770a..15f7c46e525 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -2175,9 +2175,10 @@ public final class ZkController { * * @return true on success */ - public static boolean persistConfigResourceToZooKeeper(ZkSolrResourceLoader zkLoader, int znodeVersion, + public static int persistConfigResourceToZooKeeper(ZkSolrResourceLoader zkLoader, int znodeVersion, String resourceName, byte[] content, boolean createIfNotExists) { + int latestVersion = znodeVersion; final ZkController zkController = zkLoader.getZkController(); final SolrZkClient zkClient = zkController.getZkClient(); final String resourceLocation = zkLoader.getConfigSetZkPath() + "/" + resourceName; @@ -2185,17 +2186,19 @@ public final class ZkController { try { try { zkClient.setData(resourceLocation, content, znodeVersion, true); + latestVersion = znodeVersion + 1;// if the set succeeded , it should have incremented the version by one always log.info("Persisted config data to node {} ", resourceLocation); touchConfDir(zkLoader); } catch (NoNodeException e) { if (createIfNotExists) { try { zkClient.create(resourceLocation, content, CreateMode.PERSISTENT, true); + latestVersion = 0;//just created so version must be zero touchConfDir(zkLoader); } catch (KeeperException.NodeExistsException nee) { try { Stat stat = zkClient.exists(resourceLocation, null, true); - log.info("failed to set data version in zk is {0} and expected version is {1} ", stat.getVersion(), znodeVersion); + log.info("failed to set data version in zk is {} and expected version is {} ", stat.getVersion(), znodeVersion); } catch (Exception e1) { log.warn("could not get stat"); } @@ -2227,7 +2230,7 @@ public final class ZkController { log.error(msg, e); throw new SolrException(ErrorCode.SERVER_ERROR, msg, e); } - return true; + return latestVersion; } public static void touchConfDir(ZkSolrResourceLoader zkLoader) { diff --git a/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java b/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java index 6ace75b92bf..46cbfa2e27d 100644 --- a/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java +++ b/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java @@ -187,14 +187,14 @@ public class ConfigOverlay implements MapSerializable { public static final String RESOURCE_NAME = "configoverlay.json"; - private static final Long STR_ATTR = 0L; + /*private static final Long STR_ATTR = 0L; private static final Long STR_NODE = 1L; private static final Long BOOL_ATTR = 10L; private static final Long BOOL_NODE = 11L; private static final Long INT_ATTR = 20L; private static final Long INT_NODE = 21L; private static final Long FLOAT_ATTR = 30L; - private static final Long FLOAT_NODE = 31L; + private static final Long FLOAT_NODE = 31L;*/ private static Map editable_prop_map; //The path maps to the xml xpath and value of 1 means it is a tag with a string value and value diff --git a/solr/core/src/java/org/apache/solr/core/RequestParams.java b/solr/core/src/java/org/apache/solr/core/RequestParams.java index 17f4123f841..aae2a0b804d 100644 --- a/solr/core/src/java/org/apache/solr/core/RequestParams.java +++ b/solr/core/src/java/org/apache/solr/core/RequestParams.java @@ -148,6 +148,7 @@ public class RequestParams implements MapSerializable { ZkSolrResourceLoader resourceLoader = (ZkSolrResourceLoader) loader; try { Stat stat = resourceLoader.getZkController().getZkClient().exists(resourceLoader.getConfigSetZkPath() + "/" + RequestParams.RESOURCE, null, true); + log.debug("latest version of {} in ZK is : {}", resourceLoader.getConfigSetZkPath() + "/" + RequestParams.RESOURCE, stat == null ? "": stat.getVersion()); if (stat == null) { requestParams = new RequestParams(Collections.EMPTY_MAP, -1); } else if (requestParams == null || stat.getVersion() > requestParams.getZnodeVersion()) { diff --git a/solr/core/src/java/org/apache/solr/core/SolrConfig.java b/solr/core/src/java/org/apache/solr/core/SolrConfig.java index 1126a919e54..121165ec8ff 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrConfig.java +++ b/solr/core/src/java/org/apache/solr/core/SolrConfig.java @@ -77,6 +77,7 @@ import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.solr.core.ConfigOverlay.ZNODEVER; import static org.apache.solr.core.SolrConfig.PluginOpts.LAZY; import static org.apache.solr.core.SolrConfig.PluginOpts.MULTI_OK; import static org.apache.solr.core.SolrConfig.PluginOpts.NOOP; @@ -819,7 +820,7 @@ public class SolrConfig extends Config implements MapSerializable { @Override public Map toMap() { LinkedHashMap result = new LinkedHashMap(); - if (getZnodeVersion() > -1) result.put("znodeVersion", getZnodeVersion()); + if (getZnodeVersion() > -1) result.put(ZNODEVER, getZnodeVersion()); result.put("luceneMatchVersion", luceneMatchVersion); result.put("updateHandler", getUpdateHandlerInfo().toMap()); Map m = new LinkedHashMap(); diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index 4afaf8a65a7..a7506ce6566 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -66,6 +66,7 @@ import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockObtainFailedException; import org.apache.solr.client.solrj.impl.BinaryResponseParser; import org.apache.solr.cloud.CloudDescriptor; @@ -132,7 +133,6 @@ import org.apache.solr.update.processor.RunUpdateProcessorFactory; import org.apache.solr.update.processor.UpdateRequestProcessorChain; import org.apache.solr.update.processor.UpdateRequestProcessorChain.ProcessorInfo; import org.apache.solr.update.processor.UpdateRequestProcessorFactory; -import org.apache.solr.util.ConcurrentLRUCache; import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.PropertiesInputStream; import org.apache.solr.util.RefCounted; @@ -2077,13 +2077,13 @@ public final class SolrCore implements SolrInfoMBean, Closeable { HashMap m= new HashMap<>(); m.put("xml", new XMLResponseWriter()); m.put("standard", m.get("xml")); - m.put("json", new JSONResponseWriter()); + m.put(CommonParams.JSON, new JSONResponseWriter()); m.put("python", new PythonResponseWriter()); m.put("php", new PHPResponseWriter()); m.put("phps", new PHPSerializedResponseWriter()); m.put("ruby", new RubyResponseWriter()); m.put("raw", new RawResponseWriter()); - m.put("javabin", new BinaryResponseWriter()); + m.put(CommonParams.JAVABIN, new BinaryResponseWriter()); m.put("csv", new CSVResponseWriter()); m.put("xsort", new SortingResponseWriter()); m.put("schema.xml", new SchemaXmlResponseWriter()); @@ -2463,12 +2463,12 @@ public final class SolrCore implements SolrInfoMBean, Closeable { zkSolrResourceLoader.getZkController().registerConfListenerForCore( zkSolrResourceLoader.getConfigSetZkPath(), this, - getListener(this, zkSolrResourceLoader)); + getConfListener(this, zkSolrResourceLoader)); } - private static Runnable getListener(SolrCore core, ZkSolrResourceLoader zkSolrResourceLoader) { + public static Runnable getConfListener(SolrCore core, ZkSolrResourceLoader zkSolrResourceLoader) { final String coreName = core.getName(); final CoreContainer cc = core.getCoreDescriptor().getCoreContainer(); final String overlayPath = zkSolrResourceLoader.getConfigSetZkPath() + "/" + ConfigOverlay.RESOURCE_NAME; @@ -2506,9 +2506,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { cc.reload(coreName); return; } - //some files in conf directoy has changed other than schema.xml, - // solrconfig.xml. so fire event listeners - + //some files in conf directory may have other than managedschema, overlay, params try (SolrCore core = cc.solrCores.getCoreFromAnyList(coreName, true)) { if (core == null || core.isClosed()) return; for (Runnable listener : core.confListeners) { diff --git a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java index 86571020bdc..b6fb550a551 100644 --- a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java @@ -21,6 +21,7 @@ package org.apache.solr.handler; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -29,35 +30,63 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.google.common.collect.ImmutableSet; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.cloud.ZkCLI; import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.ZkSolrResourceLoader; import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.MapSolrParams; +import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.StrUtils; import org.apache.solr.core.ConfigOverlay; import org.apache.solr.core.PluginInfo; import org.apache.solr.core.ImplicitPlugins; import org.apache.solr.core.RequestParams; import org.apache.solr.core.SolrConfig; +import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrResourceLoader; +import org.apache.solr.handler.admin.CollectionsHandler; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrRequestHandler; +import org.apache.solr.response.BinaryResponseWriter; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.schema.SchemaManager; import org.apache.solr.util.CommandOperation; +import org.apache.solr.util.DefaultSolrThreadFactory; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static java.util.Collections.singletonList; +import static org.apache.solr.common.cloud.ZkNodeProps.makeMap; import static org.apache.solr.common.params.CoreAdminParams.NAME; import static org.apache.solr.common.util.StrUtils.formatString; import static org.apache.solr.core.ConfigOverlay.NOT_EDITABLE; +import static org.apache.solr.core.ConfigOverlay.ZNODEVER; import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_CLASS; import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_NAME; import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_NAME_IN_OVERLAY; @@ -67,6 +96,7 @@ public class SolrConfigHandler extends RequestHandlerBase { public static final Logger log = LoggerFactory.getLogger(SolrConfigHandler.class); public static final boolean configEditing_disabled = Boolean.getBoolean("disable.configEdit"); private static final Map namedPlugins; + private Lock reloadLock = new ReentrantLock(true); static { Map map = new HashMap<>(); @@ -99,7 +129,7 @@ public class SolrConfigHandler extends RequestHandlerBase { } - private static class Command { + private class Command { private final SolrQueryRequest req; private final SolrQueryResponse resp; private final String method; @@ -122,6 +152,7 @@ public class SolrConfigHandler extends RequestHandlerBase { private void handleGET() { if (parts.size() == 1) { + //this is the whole config. sent out the whole payload resp.add("config", getConfigDetails()); } else { if (ConfigOverlay.NAME.equals(parts.get(1))) { @@ -131,9 +162,9 @@ public class SolrConfigHandler extends RequestHandlerBase { RequestParams params = req.getCore().getSolrConfig().getRequestParams(); MapSolrParams p = params.getParams(parts.get(2)); Map m = new LinkedHashMap<>(); - m.put(ConfigOverlay.ZNODEVER, params.getZnodeVersion()); + m.put(ZNODEVER, params.getZnodeVersion()); if (p != null) { - m.put(RequestParams.NAME, ZkNodeProps.makeMap(parts.get(2), p.getMap())); + m.put(RequestParams.NAME, makeMap(parts.get(2), p.getMap())); } resp.add(SolrQueryResponse.NAME, m); } else { @@ -141,8 +172,53 @@ public class SolrConfigHandler extends RequestHandlerBase { } } else { - Map m = getConfigDetails(); - resp.add("config", ZkNodeProps.makeMap(parts.get(1), m.get(parts.get(1)))); + if (ZNODEVER.equals(parts.get(1))) { + resp.add(ZNODEVER, ZkNodeProps.makeMap( + ConfigOverlay.NAME, req.getCore().getSolrConfig().getOverlay().getZnodeVersion(), + RequestParams.NAME, req.getCore().getSolrConfig().getRequestParams().getZnodeVersion())); + boolean checkStale = false; + int expectedVersion = req.getParams().getInt(ConfigOverlay.NAME, -1); + int actualVersion = req.getCore().getSolrConfig().getOverlay().getZnodeVersion(); + if (expectedVersion > actualVersion) { + log.info("expecting overlay version {} but my version is {}", expectedVersion, actualVersion); + checkStale = true; + } else if (expectedVersion != -1) { + log.info("I already have the expected version {} of config", expectedVersion); + } + expectedVersion = req.getParams().getInt(RequestParams.NAME, -1); + actualVersion = req.getCore().getSolrConfig().getRequestParams().getZnodeVersion(); + if (expectedVersion > actualVersion) { + log.info("expecting params version {} but my version is {}", expectedVersion, actualVersion); + checkStale = true; + } else if (expectedVersion != -1) { + log.info("I already have the expected version {} of params", expectedVersion); + } + if (checkStale && req.getCore().getResourceLoader() instanceof ZkSolrResourceLoader) { + new Thread(SolrConfigHandler.class.getSimpleName() + "-refreshconf") { + @Override + public void run() { + if (!reloadLock.tryLock()) { + log.info("Another reload is in progress . Not doing anything"); + return; + } + try { + log.info("Trying to update my configs"); + SolrCore.getConfListener(req.getCore(), (ZkSolrResourceLoader) req.getCore().getResourceLoader()).run(); + } catch (Exception e) { + log.error("Unable to refresh conf ", e); + } finally { + reloadLock.unlock(); + } + } + }.start(); + } else { + log.info("checkStale {} , resourceloader {}", checkStale, req.getCore().getResourceLoader().getClass().getName()); + } + + } else { + Map m = getConfigDetails(); + resp.add("config", makeMap(parts.get(1), m.get(parts.get(1)))); + } } } } @@ -277,8 +353,15 @@ public class SolrConfigHandler extends RequestHandlerBase { if (ops.isEmpty()) { ZkController.touchConfDir(zkLoader); } else { - ZkController.persistConfigResourceToZooKeeper(zkLoader, params.getZnodeVersion(), - RequestParams.RESOURCE, params.toByteArray(), true); + log.info("persisting params version : {}", params.toMap()); + int latestVersion = ZkController.persistConfigResourceToZooKeeper(zkLoader, + params.getZnodeVersion(), + RequestParams.RESOURCE, + params.toByteArray(), true); + waitForAllReplicasState(req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName(), + req.getCore().getCoreDescriptor().getCoreContainer().getZkController(), + RequestParams.NAME, + latestVersion, 30); } } else { @@ -326,17 +409,20 @@ public class SolrConfigHandler extends RequestHandlerBase { } List errs = CommandOperation.captureErrors(ops); if (!errs.isEmpty()) { - log.info("Failed to run commands errors are {}", StrUtils.join(errs, ',')); + log.info("Failed to run commands. errors are {}", StrUtils.join(errs, ',')); resp.add(CommandOperation.ERR_MSGS, errs); return; } SolrResourceLoader loader = req.getCore().getResourceLoader(); if (loader instanceof ZkSolrResourceLoader) { - ZkController.persistConfigResourceToZooKeeper((ZkSolrResourceLoader) loader, overlay.getZnodeVersion(), + int latestVersion = ZkController.persistConfigResourceToZooKeeper((ZkSolrResourceLoader) loader, overlay.getZnodeVersion(), ConfigOverlay.RESOURCE_NAME, overlay.toByteArray(), true); - - log.info("Executed config commands successfully and persited to ZK {}", ops); + log.info("Executed config commands successfully and persisted to ZK {}", ops); + waitForAllReplicasState(req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName(), + req.getCore().getCoreDescriptor().getCoreContainer().getZkController(), + ConfigOverlay.NAME, + latestVersion, 30); } else { SolrResourceLoader.persistConfLocally(loader, ConfigOverlay.RESOURCE_NAME, overlay.toByteArray()); req.getCore().getCoreDescriptor().getCoreContainer().reload(req.getCore().getName()); @@ -519,7 +605,7 @@ public class SolrConfigHandler extends RequestHandlerBase { private static Set subPaths = new HashSet<>(Arrays.asList("/overlay", "/params", - "/query", "/jmx", "/requestDispatcher")); + "/query", "/jmx", "/requestDispatcher", "/znodeVersion")); static { for (SolrConfig.SolrPluginInfo solrPluginInfo : SolrConfig.plugins) @@ -556,4 +642,170 @@ public class SolrConfigHandler extends RequestHandlerBase { public static final String CREATE = "create"; private static Set cmdPrefixes = ImmutableSet.of(CREATE, UPDATE, "delete", "add"); + /** + * Block up to a specified maximum time until we see agreement on the schema + * version in ZooKeeper across all replicas for a collection. + */ + private static void waitForAllReplicasState(String collection, + ZkController zkController, + String prop, + int expectedVersion, + int maxWaitSecs) { + long startMs = System.currentTimeMillis(); + // get a list of active replica cores to query for the schema zk version (skipping this core of course) + List concurrentTasks = new ArrayList<>(); + + for (String coreUrl : getActiveReplicaCoreUrls(zkController, collection)) { + PerReplicaCallable e = new PerReplicaCallable(coreUrl, prop, expectedVersion, maxWaitSecs); + concurrentTasks.add(e); + } + if (concurrentTasks.isEmpty()) return; // nothing to wait for ... + + log.info(formatString("Waiting up to {0} secs for {1} replicas to set the property {2} to be of version {3} for collection {4}", + maxWaitSecs, concurrentTasks.size(), prop, expectedVersion, collection)); + + // use an executor service to invoke schema zk version requests in parallel with a max wait time + int poolSize = Math.min(concurrentTasks.size(), 10); + ExecutorService parallelExecutor = + Executors.newFixedThreadPool(poolSize, new DefaultSolrThreadFactory("solrHandlerExecutor")); + try { + List> results = + parallelExecutor.invokeAll(concurrentTasks, maxWaitSecs, TimeUnit.SECONDS); + + // determine whether all replicas have the update + List failedList = null; // lazily init'd + for (int f = 0; f < results.size(); f++) { + Boolean success = false; + Future next = results.get(f); + if (next.isDone() && !next.isCancelled()) { + // looks to have finished, but need to check if it succeeded + try { + success = next.get(); + } catch (ExecutionException e) { + // shouldn't happen since we checked isCancelled + } + } + + if (!success) { + String coreUrl = concurrentTasks.get(f).coreUrl; + log.warn("Core " + coreUrl + "could not get the expected version " + expectedVersion); + if (failedList == null) failedList = new ArrayList<>(); + failedList.add(coreUrl); + } + } + + // if any tasks haven't completed within the specified timeout, it's an error + if (failedList != null) + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + formatString("{0} out of {1} the property {2} to be of version {3} within {4} seconds! Failed cores: {5}", + failedList.size(), concurrentTasks.size() + 1, prop, expectedVersion, maxWaitSecs, failedList)); + + } catch (InterruptedException ie) { + log.warn(formatString( + "Core was interrupted . trying to set the property {1} to version {2} to propagate to {3} replicas for collection {4}", + prop, expectedVersion, concurrentTasks.size(), collection)); + Thread.currentThread().interrupt(); + } finally { + if (!parallelExecutor.isShutdown()) + parallelExecutor.shutdownNow(); + } + + long diffMs = (System.currentTimeMillis() - startMs); + log.info(formatString( + "Took {0} secs to set the property {1} to be of version {2} for collection {3}", + Math.round(diffMs / 1000d), prop, expectedVersion, collection)); + } + + public static List getActiveReplicaCoreUrls(ZkController zkController, + String collection) { + List activeReplicaCoreUrls = new ArrayList<>(); + ClusterState clusterState = zkController.getZkStateReader().getClusterState(); + Set liveNodes = clusterState.getLiveNodes(); + Collection activeSlices = clusterState.getActiveSlices(collection); + if (activeSlices != null && activeSlices.size() > 0) { + for (Slice next : activeSlices) { + Map replicasMap = next.getReplicasMap(); + if (replicasMap != null) { + for (Map.Entry entry : replicasMap.entrySet()) { + Replica replica = entry.getValue(); + if (ZkStateReader.ACTIVE.equals(replica.getStr(ZkStateReader.STATE_PROP)) && + liveNodes.contains(replica.getNodeName())) { + activeReplicaCoreUrls.add(replica.getCoreUrl()); + } + } + } + } + } + return activeReplicaCoreUrls; + } + + private static class PerReplicaCallable extends SolrRequest implements Callable { + String coreUrl; + String prop; + int expectedZkVersion; + Number remoteVersion = null; + int maxWait; + + PerReplicaCallable(String coreUrl, String prop, int expectedZkVersion, int maxWait) { + super(METHOD.GET, "/config/" + ZNODEVER); + this.coreUrl = coreUrl; + this.expectedZkVersion = expectedZkVersion; + this.prop = prop; + this.maxWait = maxWait; + } + + @Override + public SolrParams getParams() { + return new ModifiableSolrParams() + .set(prop, expectedZkVersion) + .set(CommonParams.WT, CommonParams.JAVABIN); + } + + @Override + public Boolean call() throws Exception { + long startTime = System.currentTimeMillis(); + int attempts = 0; + try (HttpSolrClient solr = new HttpSolrClient(coreUrl)) { + // eventually, this loop will get killed by the ExecutorService's timeout + while (true) { + try { + long timeElapsed = (System.currentTimeMillis() - startTime) / 1000; + if (timeElapsed >= maxWait) { + return false; + } + log.info("Time elapsed : {} secs, maxWait {}", timeElapsed, maxWait); + Thread.sleep(100); + NamedList resp = solr.httpUriRequest(this).future.get(); + if (resp != null) { + Map m = (Map) resp.get(ZNODEVER); + if (m != null) { + remoteVersion = (Number) m.get(prop); + if (remoteVersion != null && remoteVersion.intValue() >= expectedZkVersion) break; + } + } + + attempts++; + log.info(formatString("Could not get expectedVersion {0} from {1} for prop {2} after {3} attempts", expectedZkVersion, coreUrl, prop, attempts)); + } catch (Exception e) { + if (e instanceof InterruptedException) { + break; // stop looping + } else { + log.warn("Failed to get /schema/zkversion from " + coreUrl + " due to: " + e); + } + } + } + } + return true; + } + + @Override + public Collection getContentStreams() throws IOException { + return null; + } + + @Override + protected SolrResponse createResponse(SolrClient client) { + return null; + } + } } diff --git a/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java b/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java index aa469215cca..5717ae9a3cf 100644 --- a/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java +++ b/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java @@ -21,7 +21,6 @@ package org.apache.solr.core; import java.io.File; import java.io.IOException; import java.io.StringReader; -import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; import java.util.List; diff --git a/solr/core/src/test/org/apache/solr/handler/TestReqParamsAPI.java b/solr/core/src/test/org/apache/solr/handler/TestReqParamsAPI.java index 2e6e038656f..45745b2923b 100644 --- a/solr/core/src/test/org/apache/solr/handler/TestReqParamsAPI.java +++ b/solr/core/src/test/org/apache/solr/handler/TestReqParamsAPI.java @@ -41,7 +41,6 @@ import static org.apache.solr.handler.TestSolrConfigHandlerCloud.compareValues; * limitations under the License. */ -@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6924") public class TestReqParamsAPI extends AbstractFullDistribZkTestBase { static final Logger log = LoggerFactory.getLogger(TestSolrConfigHandlerCloud.class); private List restTestHarnesses = new ArrayList<>(); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java index ecd4837d7b2..a51bdc077b7 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java @@ -21,6 +21,8 @@ import org.noggit.JSONUtil; import java.util.Map; +import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; public class Replica extends ZkNodeProps { private final String name; @@ -35,6 +37,9 @@ public class Replica extends ZkNodeProps { public String getName() { return name; } + public String getCoreUrl() { + return ZkCoreNodeProps.getCoreUrl(getStr(BASE_URL_PROP), getStr(CORE_NAME_PROP)); + } /** The name of the node this replica resides on */ public String getNodeName() { diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java index 699059aa2fc..741a357a39f 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java @@ -224,5 +224,9 @@ public interface CommonParams { * When querying a node, prefer local node's cores for distributed queries. */ public static final String PREFER_LOCAL_SHARDS = "preferLocalShards"; + + public static final String JAVABIN = "javabin"; + + public static final String JSON = "json"; }