SOLR-9106: Cache cluster properties on ZkStateReader

This commit is contained in:
Alan Woodward 2016-05-13 22:22:23 +01:00
parent ec4bcf1c70
commit dd23fa4015
18 changed files with 312 additions and 255 deletions

View File

@ -208,6 +208,8 @@ Optimizations
* SOLR-9014: Deprecate and reduce usage of ClusterState methods which may make calls to ZK via
the lazy collection reference. (Scott Blum, shalin)
* SOLR-9106: Cluster properties are now cached on ZkStateReader. (Alan Woodward)
Other Changes
----------------------
* SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy.

View File

@ -1062,8 +1062,9 @@ public class Overseer implements Closeable {
throw new RuntimeException(e);
}
}
public static boolean isLegacy(Map clusterProps) {
return !"false".equals(clusterProps.get(ZkStateReader.LEGACY_CLOUD));
public static boolean isLegacy(ZkStateReader stateReader) {
String legacyProperty = stateReader.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "true");
return !"false".equals(legacyProperty);
}
public ZkStateReader getZkStateReader() {

View File

@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@ -148,7 +147,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
// TODO: extract to configurable strategy class ??
ClusterState clusterState = zkStateReader.getClusterState();
//check if we have disabled autoAddReplicas cluster wide
String autoAddReplicas = (String) zkStateReader.getClusterProps().get(ZkStateReader.AUTO_ADD_REPLICAS);
String autoAddReplicas = zkStateReader.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
return;
}
@ -229,7 +228,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
private boolean addReplica(final String collection, DownReplica badReplica) {
// first find best home - first strategy, sort by number of cores
// hosted where maxCoresPerNode is not violated
final Integer maxCoreCount = (Integer) zkStateReader.getClusterProps().get(ZkStateReader.MAX_CORES_PER_NODE);
final Integer maxCoreCount = zkStateReader.getClusterProperty(ZkStateReader.MAX_CORES_PER_NODE, (Integer) null);
final String createUrl = getBestCreateUrl(zkStateReader, badReplica, maxCoreCount);
if (createUrl == null) {
log.warn("Could not find a node to create new replica on.");

View File

@ -1894,7 +1894,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
positionVsNodes = identifyNodes(clusterState, nodeList, message, shardNames, repFactor);
}
boolean isLegacyCloud = Overseer.isLegacy(zkStateReader.getClusterProps());
boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
createConfNode(configName, collectionName, isLegacyCloud);
@ -2126,7 +2126,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
ModifiableSolrParams params = new ModifiableSolrParams();
if (!Overseer.isLegacy(zkStateReader.getClusterProps())) {
if (!Overseer.isLegacy(zkStateReader)) {
if (!skipCreateReplicaInClusterState) {
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), ZkStateReader.COLLECTION_PROP,
collection, ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.CORE_NAME_PROP, coreName,

View File

@ -16,7 +16,16 @@
*/
package org.apache.solr.cloud;
import static org.apache.solr.common.params.CommonParams.*;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@ -28,26 +37,16 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterProperties;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.CommonParams.VALUE_LONG;
public class ZkCLI {
@ -324,28 +323,12 @@ public class ZkCLI {
//If -val option is missing, we will use the null value. This is required to maintain
//compatibility with Collections API.
String propertyValue = line.getOptionValue(VALUE_LONG);
ZkStateReader reader = new ZkStateReader(zkClient);
ClusterProperties props = new ClusterProperties(zkClient);
try {
reader.setClusterProperty(propertyName, propertyValue);
} catch (SolrException ex) {
//This can happen if two concurrent invocations of this command collide
//with each other. Here we are just adding a defensive check to see if
//the value is already set to expected value. If yes, then we don't
//fail the command.
Throwable cause = ex.getCause();
if(cause instanceof KeeperException.NodeExistsException
|| cause instanceof KeeperException.BadVersionException) {
String currentValue = (String)reader.getClusterProps().get(propertyName);
if((currentValue == propertyValue) || (currentValue != null && currentValue.equals(propertyValue))) {
return;
}
}
System.out.println("Unable to set the cluster property due to following error : " +
ex.getLocalizedMessage() +
((cause instanceof KeeperException.BadVersionException)?". Try again":""));
props.setClusterProperty(propertyName, propertyValue);
} catch (IOException ex) {
System.out.println("Unable to set the cluster property due to following error : " + ex.getLocalizedMessage());
System.exit(1);
} finally {
reader.close();
}
} else {
// If not cmd matches

View File

@ -151,7 +151,7 @@ public final class ZkController {
private final int localHostPort; // example: 54065
private final String hostName; // example: 127.0.0.1
private final String nodeName; // example: 127.0.0.1:54065_solr
private final String baseURL; // example: http://127.0.0.1:54065/solr
private String baseURL; // example: http://127.0.0.1:54065/solr
private final CloudConfig cloudConfig;
@ -386,8 +386,6 @@ public final class ZkController {
if (cc != null) cc.securityNodeChanged();
});
this.baseURL = zkStateReader.getBaseUrlForNodeName(this.nodeName);
init(registerOnReconnect);
}
@ -642,6 +640,7 @@ public final class ZkController {
try {
createClusterZkNodes(zkClient);
zkStateReader.createClusterStateWatchersAndUpdate();
this.baseURL = zkStateReader.getBaseUrlForNodeName(this.nodeName);
// start the overseer first as following code may need it's processing
if (!zkRunOnly) {
@ -1488,7 +1487,7 @@ public final class ZkController {
}
private void checkStateInZk(CoreDescriptor cd) throws InterruptedException {
if (!Overseer.isLegacy(zkStateReader.getClusterProps())) {
if (!Overseer.isLegacy(zkStateReader)) {
CloudDescriptor cloudDesc = cd.getCloudDescriptor();
String coreNodeName = cloudDesc.getCoreNodeName();
assert coreNodeName != null : "SolrCore: " + cd.getName() + " has no coreNodeName";

View File

@ -197,7 +197,7 @@ public class ReplicaMutator {
}
public ZkWriteCommand setState(ClusterState clusterState, ZkNodeProps message) {
if (Overseer.isLegacy(zkStateReader.getClusterProps())) {
if (Overseer.isLegacy(zkStateReader)) {
return updateState(clusterState, message);
} else {
return updateStateNew(clusterState, message);

View File

@ -737,7 +737,7 @@ public class CoreContainer {
boolean preExisitingZkEntry = false;
try {
if (getZkController() != null) {
if (!Overseer.isLegacy(getZkController().getZkStateReader().getClusterProps())) {
if (!Overseer.isLegacy(getZkController().getZkStateReader())) {
if (cd.getCloudDescriptor().getCoreNodeName() == null) {
throw new SolrException(ErrorCode.SERVER_ERROR, "non legacy mode coreNodeName missing " + parameters.toString());

View File

@ -152,7 +152,7 @@ public class ClusterStatus {
clusterStatus.add("collections", collectionProps);
// read cluster properties
Map clusterProps = zkStateReader.getClusterProps();
Map clusterProps = zkStateReader.getClusterProperties();
if (clusterProps != null && !clusterProps.isEmpty()) {
clusterStatus.add("properties", clusterProps);
}

View File

@ -51,17 +51,8 @@ import org.apache.solr.cloud.rule.ReplicaAssigner;
import org.apache.solr.cloud.rule.Rule;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.*;
import org.apache.solr.common.cloud.Replica.State;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
@ -570,7 +561,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
String name = req.getParams().required().get(NAME);
String val = req.getParams().get(VALUE_LONG);
h.coreContainer.getZkController().getZkStateReader().setClusterProperty(name, val);
ClusterProperties cp = new ClusterProperties(h.coreContainer.getZkController().getZkClient());
cp.setClusterProperty(name, val);
return null;
}
},
@ -808,7 +800,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
String location = req.getParams().get("location");
if (location == null) {
location = (String) h.coreContainer.getZkController().getZkStateReader().getClusterProps().get("location");
location = h.coreContainer.getZkController().getZkStateReader().getClusterProperty("location", (String) null);
}
if (location == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query parameter or set as a cluster property");
@ -832,7 +824,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
String location = req.getParams().get("location");
if (location == null) {
location = (String) h.coreContainer.getZkController().getZkStateReader().getClusterProps().get("location");
location = h.coreContainer.getZkController().getZkStateReader().getClusterProperty("location", (String) null);
}
if (location == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query parameter or set as a cluster property");

View File

@ -1320,7 +1320,7 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
boolean changed = false;
while(! timeout.hasTimedOut()){
Thread.sleep(10);
changed = Objects.equals(val,client.getZkStateReader().getClusterProps().get(name));
changed = Objects.equals(val,client.getZkStateReader().getClusterProperty(name, (String) null));
if(changed) break;
}
return changed;

View File

@ -45,7 +45,6 @@ import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.solr.cloud.ReplicaPropertiesBase.verifyUniqueAcrossCollection;
@ -336,7 +335,7 @@ public class CollectionsAPISolrJTest extends AbstractFullDistribZkTestBase {
while(! timeout.hasTimedOut()){
Thread.sleep(10);
changed = Objects.equals("false",
cloudClient.getZkStateReader().getClusterProps().get(ZkStateReader.LEGACY_CLOUD));
cloudClient.getZkStateReader().getClusterProperty(ZkStateReader.LEGACY_CLOUD, "none"));
if(changed) break;
}
assertTrue("The Cluster property wasn't set", changed);
@ -351,7 +350,7 @@ public class CollectionsAPISolrJTest extends AbstractFullDistribZkTestBase {
changed = false;
while(! timeout.hasTimedOut()) {
Thread.sleep(10);
changed = (cloudClient.getZkStateReader().getClusterProps().get(ZkStateReader.LEGACY_CLOUD) == null);
changed = (cloudClient.getZkStateReader().getClusterProperty(ZkStateReader.LEGACY_CLOUD, (String) null) == null);
if(changed)
break;
}

View File

@ -16,11 +16,17 @@
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@ -48,32 +54,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.getCurrentArguments;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.easymock.EasyMock.*;
public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
@ -284,11 +265,12 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
}).anyTimes();
}
zkStateReaderMock.getClusterProps();
expectLastCall().andAnswer(new IAnswer<Map>() {
zkStateReaderMock.getClusterProperty("legacyCloud", "true");
expectLastCall().andAnswer(new IAnswer<String>() {
@Override
public Map answer() throws Throwable {
return new HashMap();
public String answer() throws Throwable {
return "true";
}
});

View File

@ -16,6 +16,15 @@
*/
package org.apache.solr.cloud;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.filefilter.RegexFileFilter;
@ -23,6 +32,7 @@ import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.solr.SolrJettyTestBase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterProperties;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.VMParamsAllAndReadonlyDigestZkACLProvider;
import org.apache.solr.common.cloud.ZkConfigManager;
@ -37,15 +47,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
// TODO: This test would be a lot faster if it used a solrhome with fewer config
// files - there are a lot of them to upload
public class ZkCLITest extends SolrTestCaseJ4 {
@ -321,22 +322,19 @@ public class ZkCLITest extends SolrTestCaseJ4 {
@Test
public void testSetClusterProperty() throws Exception {
ZkStateReader reader = new ZkStateReader(zkClient);
try {
ClusterProperties properties = new ClusterProperties(zkClient);
// add property urlScheme=http
String[] args = new String[] {"-zkhost", zkServer.getZkAddress(),
"-cmd", "CLUSTERPROP", "-name", "urlScheme", "-val", "http"};
ZkCLI.main(args);
assertEquals("http", reader.getClusterProps().get("urlScheme"));
assertEquals("http", properties.getClusterProperty("urlScheme", "none"));
// remove it again
args = new String[] {"-zkhost", zkServer.getZkAddress(),
"-cmd", "CLUSTERPROP", "-name", "urlScheme"};
ZkCLI.main(args);
assertNull(reader.getClusterProps().get("urlScheme"));
} finally {
reader.close();
}
assertNull(properties.getClusterProperty("urlScheme", (String) null));
}
@Test

View File

@ -16,17 +16,15 @@
*/
package org.apache.solr.cloud;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.*;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
@ -40,12 +38,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slow
@SolrTestCaseJ4.SuppressSSL
public class ZkControllerTest extends SolrTestCaseJ4 {
@ -98,8 +90,13 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
ZkStateReader zkStateReader = new ZkStateReader(server.getZkAddress(), TIMEOUT, TIMEOUT);
try {
try (SolrZkClient client = new SolrZkClient(server.getZkAddress(), TIMEOUT)) {
ZkController.createClusterZkNodes(client);
try (ZkStateReader zkStateReader = new ZkStateReader(client)) {
zkStateReader.createClusterStateWatchersAndUpdate();
// getBaseUrlForNodeName
assertEquals("http://zzz.xxx:1234/solr",
zkStateReader.getBaseUrlForNodeName("zzz.xxx:1234_solr"));
@ -139,9 +136,16 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
zkStateReader.getBaseUrlForNodeName
(ZkController.generateNodeName("foo-bar.com", "80", "/some_dir")));
}
ClusterProperties cp = new ClusterProperties(client);
cp.setClusterProperty("urlScheme", "https");
//Verify the URL Scheme is taken into account
zkStateReader.getZkClient().create(ZkStateReader.CLUSTER_PROPS,
Utils.toJSON(Collections.singletonMap("urlScheme", "https")), CreateMode.PERSISTENT, true);
try (ZkStateReader zkStateReader = new ZkStateReader(client)) {
zkStateReader.createClusterStateWatchersAndUpdate();
assertEquals("https://zzz.xxx:1234/solr",
zkStateReader.getBaseUrlForNodeName("zzz.xxx:1234_solr"));
@ -149,8 +153,8 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
assertEquals("https://foo-bar.com:80/some_dir",
zkStateReader.getBaseUrlForNodeName
(ZkController.generateNodeName("foo-bar.com", "80", "/some_dir")));
} finally {
zkStateReader.close();
}
}
} finally {
server.shutdown();

View File

@ -21,21 +21,16 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.SolrPing;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.response.SolrQueryResponse;
import org.junit.BeforeClass;
import org.junit.Test;
public class SearchHandlerTest extends SolrTestCaseJ4
{
@BeforeClass

View File

@ -0,0 +1,126 @@
package org.apache.solr.common.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
/**
* Interact with solr cluster properties
*
* Note that all methods on this class make calls to ZK on every invocation. For
* read-only eventually-consistent uses, clients should instead call
* {@link ZkStateReader#getClusterProperty(String, Object)}
*/
public class ClusterProperties {
private final SolrZkClient client;
/**
* Creates a ClusterProperties object using a provided SolrZkClient
*/
public ClusterProperties(SolrZkClient client) {
this.client = client;
}
/**
* Read the value of a cluster property, returning a default if it is not set
* @param key the property name
* @param defaultValue the default value
* @param <T> the type of the property
* @return the property value
* @throws IOException if there is an error reading the value from the cluster
*/
@SuppressWarnings("unchecked")
public <T> T getClusterProperty(String key, T defaultValue) throws IOException {
T value = (T) getClusterProperties().get(key);
if (value == null)
return defaultValue;
return value;
}
/**
* Return the cluster properties
* @throws IOException if there is an error reading properties from the cluster
*/
@SuppressWarnings("unchecked")
public Map<String, Object> getClusterProperties() throws IOException {
try {
return (Map<String, Object>) Utils.fromJSON(client.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true));
} catch (KeeperException.NoNodeException e) {
return Collections.emptyMap();
} catch (KeeperException | InterruptedException e) {
throw new IOException("Error reading cluster property", SolrZkClient.checkInterrupted(e));
}
}
/**
* This method sets a cluster property.
*
* @param propertyName The property name to be set.
* @param propertyValue The value of the property.
* @throws IOException if there is an error writing data to the cluster
*/
@SuppressWarnings("unchecked")
public void setClusterProperty(String propertyName, String propertyValue) throws IOException {
if (!ZkStateReader.KNOWN_CLUSTER_PROPS.contains(propertyName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Not a known cluster property " + propertyName);
}
for (; ; ) {
Stat s = new Stat();
try {
if (client.exists(ZkStateReader.CLUSTER_PROPS, true)) {
Map properties = (Map) Utils.fromJSON(client.getData(ZkStateReader.CLUSTER_PROPS, null, s, true));
if (propertyValue == null) {
//Don't update ZK unless absolutely necessary.
if (properties.get(propertyName) != null) {
properties.remove(propertyName);
client.setData(ZkStateReader.CLUSTER_PROPS, Utils.toJSON(properties), s.getVersion(), true);
}
} else {
//Don't update ZK unless absolutely necessary.
if (!propertyValue.equals(properties.get(propertyName))) {
properties.put(propertyName, propertyValue);
client.setData(ZkStateReader.CLUSTER_PROPS, Utils.toJSON(properties), s.getVersion(), true);
}
}
} else {
Map properties = new LinkedHashMap();
properties.put(propertyName, propertyValue);
client.create(ZkStateReader.CLUSTER_PROPS, Utils.toJSON(properties), CreateMode.PERSISTENT, true);
}
} catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) {
//race condition
continue;
} catch (InterruptedException | KeeperException e) {
throw new IOException("Error setting cluster property", SolrZkClient.checkInterrupted(e));
}
break;
}
}
}

View File

@ -46,7 +46,6 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@ -133,6 +132,8 @@ public class ZkStateReader implements Closeable {
private volatile Set<String> liveNodes = emptySet();
private volatile Map<String, Object> clusterProperties = Collections.emptyMap();
private final ZkConfigManager configManager;
private ConfigData securityData;
@ -363,6 +364,7 @@ public class ZkStateReader implements Closeable {
}
// on reconnect of SolrZkClient force refresh and re-add watches.
loadClusterProperties();
refreshLiveNodes(new LiveNodeWatcher());
refreshLegacyClusterState(new LegacyClusterStateWatcher());
refreshStateFormat2Collections();
@ -793,68 +795,46 @@ public class ZkStateReader implements Closeable {
this.aliases = ClusterState.load(data);
}
public Map getClusterProps() {
try {
if (getZkClient().exists(ZkStateReader.CLUSTER_PROPS, true)) {
return (Map) Utils.fromJSON(getZkClient().getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)) ;
} else {
return new LinkedHashMap();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, "Thread interrupted. Error reading cluster properties", e);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading cluster properties", e);
}
@SuppressWarnings("unchecked")
public <T> T getClusterProperty(String key, T defaultValue) {
T value = (T) clusterProperties.get(key);
if (value == null)
return defaultValue;
return value;
}
/**
* This method sets a cluster property.
*
* @param propertyName The property name to be set.
* @param propertyValue The value of the property.
*/
public void setClusterProperty(String propertyName, String propertyValue) {
if (!KNOWN_CLUSTER_PROPS.contains(propertyName)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Not a known cluster property " + propertyName);
public Map<String, Object> getClusterProperties() {
return Collections.unmodifiableMap(clusterProperties);
}
for (; ; ) {
Stat s = new Stat();
private final Watcher clusterPropertiesWatcher = event -> {
// session events are not change events, and do not remove the watcher
if (Watcher.Event.EventType.None.equals(event.getType())) {
return;
}
loadClusterProperties();
};
@SuppressWarnings("unchecked")
private void loadClusterProperties() {
try {
if (getZkClient().exists(CLUSTER_PROPS, true)) {
Map properties = (Map) Utils.fromJSON(getZkClient().getData(CLUSTER_PROPS, null, s, true));
if (propertyValue == null) {
//Don't update ZK unless absolutely necessary.
if (properties.get(propertyName) != null) {
properties.remove(propertyName);
getZkClient().setData(CLUSTER_PROPS, Utils.toJSON(properties), s.getVersion(), true);
}
} else {
//Don't update ZK unless absolutely necessary.
if (!propertyValue.equals(properties.get(propertyName))) {
properties.put(propertyName, propertyValue);
getZkClient().setData(CLUSTER_PROPS, Utils.toJSON(properties), s.getVersion(), true);
while (true) {
try {
byte[] data = zkClient.getData(ZkStateReader.CLUSTER_PROPS, clusterPropertiesWatcher, new Stat(), true);
this.clusterProperties = (Map<String, Object>) Utils.fromJSON(data);
LOG.info("Loaded cluster properties: {}", this.clusterProperties);
return;
} catch (KeeperException.NoNodeException e) {
this.clusterProperties = Collections.emptyMap();
LOG.info("Loaded empty cluster properties");
// set an exists watch, and if the node has been created since the last call,
// read the data again
if (zkClient.exists(ZkStateReader.CLUSTER_PROPS, clusterPropertiesWatcher, true) == null)
return;
}
}
} else {
Map properties = new LinkedHashMap();
properties.put(propertyName, propertyValue);
getZkClient().create(CLUSTER_PROPS, Utils.toJSON(properties), CreateMode.PERSISTENT, true);
}
} catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) {
LOG.warn("Race condition while trying to set a new cluster prop on current version [{}]", s.getVersion());
//race condition
continue;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Thread Interrupted. Error updating path [{}]", CLUSTER_PROPS, e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Thread Interrupted. Error updating cluster property " + propertyName, e);
} catch (KeeperException e) {
LOG.error("Error updating path [{}]", CLUSTER_PROPS, e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Error updating cluster property " + propertyName, e);
}
break;
} catch (KeeperException | InterruptedException e) {
LOG.error("Error reading cluster properties from zookeeper", SolrZkClient.checkInterrupted(e));
}
}
@ -898,10 +878,7 @@ public class ZkStateReader implements Closeable {
final String hostAndPort = nodeName.substring(0,_offset);
try {
final String path = URLDecoder.decode(nodeName.substring(1+_offset), "UTF-8");
String urlScheme = (String) getClusterProps().get(URL_SCHEME);
if(urlScheme == null) {
urlScheme = "http";
}
String urlScheme = getClusterProperty(URL_SCHEME, "http");
return urlScheme + "://" + hostAndPort + (path.isEmpty() ? "" : ("/" + path));
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException("JVM Does not seem to support UTF-8", e);