mirror of https://github.com/apache/lucene.git
SOLR-3709: Cache the url list created from the ClusterState in CloudSolrServer on each request.
SOLR-3708: Add hashcode to ClusterState so that structures built based on the ClusterState can be easily cached. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1369465 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
37153b5c34
commit
238c4274b6
|
@ -109,6 +109,15 @@ New Features
|
|||
* SOLR-3672: SimplePostTool: Improvements for posting files
|
||||
Support for auto mode, recursive and wildcards (janhoy)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
* SOLR-3708: Add hashCode to ClusterState so that structures built based on the
|
||||
ClusterState can be easily cached. (Mark Miller)
|
||||
|
||||
* SOLR-3709: Cache the url list created from the ClusterState in CloudSolrServer on each
|
||||
request. (Mark Miller, yonik)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
|
|||
ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
|
||||
byte[] bytes = ZkStateReader.toJSON(clusterState);
|
||||
|
||||
ClusterState loadedClusterState = ClusterState.load(bytes, liveNodes);
|
||||
ClusterState loadedClusterState = ClusterState.load(null, bytes, liveNodes);
|
||||
|
||||
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
|
||||
.getLiveNodes().size());
|
||||
|
@ -63,13 +63,13 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
|
|||
assertEquals("Poperties not copied properly", zkNodeProps.get("prop1"), loadedClusterState.getSlice("collection1", "shard1").getShards().get("node1").get("prop1"));
|
||||
assertEquals("Poperties not copied properly", zkNodeProps.get("prop2"), loadedClusterState.getSlice("collection1", "shard1").getShards().get("node1").get("prop2"));
|
||||
|
||||
loadedClusterState = ClusterState.load(new byte[0], liveNodes);
|
||||
loadedClusterState = ClusterState.load(null, new byte[0], liveNodes);
|
||||
|
||||
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
|
||||
.getLiveNodes().size());
|
||||
assertEquals("Should not have collections", 0, loadedClusterState.getCollections().size());
|
||||
|
||||
loadedClusterState = ClusterState.load((byte[])null, liveNodes);
|
||||
loadedClusterState = ClusterState.load(null, (byte[])null, liveNodes);
|
||||
|
||||
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
|
||||
.getLiveNodes().size());
|
||||
|
|
|
@ -61,6 +61,10 @@ public class CloudSolrServer extends SolrServer {
|
|||
private LBHttpSolrServer lbServer;
|
||||
private HttpClient myClient;
|
||||
Random rand = new Random();
|
||||
|
||||
// since the state shouldn't change often, should be very cheap reads
|
||||
private volatile List<String> urlList;
|
||||
private volatile int lastClusterStateHashCode;
|
||||
/**
|
||||
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
|
||||
* in the form HOST:PORT.
|
||||
|
@ -168,22 +172,26 @@ public class CloudSolrServer extends SolrServer {
|
|||
// or shardAddressVersion (which only changes when the shards change)
|
||||
// to allow caching.
|
||||
|
||||
// build a map of unique nodes
|
||||
// TODO: allow filtering by group, role, etc
|
||||
Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
|
||||
List<String> urlList = new ArrayList<String>();
|
||||
for (Slice slice : slices.values()) {
|
||||
for (ZkNodeProps nodeProps : slice.getShards().values()) {
|
||||
ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
|
||||
String node = coreNodeProps.getNodeName();
|
||||
if (!liveNodes.contains(coreNodeProps.getNodeName())
|
||||
|| !coreNodeProps.getState().equals(
|
||||
ZkStateReader.ACTIVE)) continue;
|
||||
if (nodes.put(node, nodeProps) == null) {
|
||||
String url = coreNodeProps.getCoreUrl();
|
||||
urlList.add(url);
|
||||
if (clusterState.hashCode() != this.lastClusterStateHashCode) {
|
||||
|
||||
// build a map of unique nodes
|
||||
// TODO: allow filtering by group, role, etc
|
||||
Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
|
||||
List<String> urlList = new ArrayList<String>();
|
||||
for (Slice slice : slices.values()) {
|
||||
for (ZkNodeProps nodeProps : slice.getShards().values()) {
|
||||
ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
|
||||
String node = coreNodeProps.getNodeName();
|
||||
if (!liveNodes.contains(coreNodeProps.getNodeName())
|
||||
|| !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
|
||||
if (nodes.put(node, nodeProps) == null) {
|
||||
String url = coreNodeProps.getCoreUrl();
|
||||
urlList.add(url);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.urlList = urlList;
|
||||
this.lastClusterStateHashCode = clusterState.hashCode();
|
||||
}
|
||||
|
||||
Collections.shuffle(urlList, rand);
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.solr.common.SolrException;
|
|||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.HashPartitioner.Range;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -42,25 +43,49 @@ import org.slf4j.LoggerFactory;
|
|||
public class ClusterState implements JSONWriter.Writable {
|
||||
private static Logger log = LoggerFactory.getLogger(ClusterState.class);
|
||||
|
||||
private final Map<String, Map<String,Slice>> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
|
||||
private final Set<String> liveNodes;
|
||||
private Integer zkClusterStateVersion;
|
||||
|
||||
private final Map<String, Map<String,Slice>> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
|
||||
private final Set<String> liveNodes;
|
||||
|
||||
private final HashPartitioner hp = new HashPartitioner();
|
||||
|
||||
private final Map<String,RangeInfo> rangeInfos = new HashMap<String,RangeInfo>();
|
||||
private final Map<String,Map<String,ZkNodeProps>> leaders = new HashMap<String,Map<String,ZkNodeProps>>();
|
||||
|
||||
public ClusterState(Set<String> liveNodes,
|
||||
Map<String, Map<String,Slice>> collectionStates) {
|
||||
this.liveNodes = new HashSet<String>(liveNodes.size());
|
||||
this.liveNodes.addAll(liveNodes);
|
||||
this.collectionStates = new HashMap<String, Map<String,Slice>>(collectionStates.size());
|
||||
this.collectionStates.putAll(collectionStates);
|
||||
addRangeInfos(collectionStates.keySet());
|
||||
getShardLeaders();
|
||||
}
|
||||
|
||||
private void getShardLeaders() {
|
||||
|
||||
/**
|
||||
* Use this constr when ClusterState is meant for publication.
|
||||
*
|
||||
* hashCode and equals will only depend on liveNodes and not clusterStateVersion.
|
||||
*
|
||||
* @param liveNodes
|
||||
* @param collectionStates
|
||||
*/
|
||||
public ClusterState(Set<String> liveNodes,
|
||||
Map<String, Map<String,Slice>> collectionStates) {
|
||||
this(null, liveNodes, collectionStates);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this constr when ClusterState is meant for consumption.
|
||||
*
|
||||
* @param zkClusterStateVersion
|
||||
* @param liveNodes
|
||||
* @param collectionStates
|
||||
*/
|
||||
public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes,
|
||||
Map<String, Map<String,Slice>> collectionStates) {
|
||||
this.liveNodes = new HashSet<String>(liveNodes.size());
|
||||
this.liveNodes.addAll(liveNodes);
|
||||
this.collectionStates = new HashMap<String, Map<String,Slice>>(collectionStates.size());
|
||||
this.collectionStates.putAll(collectionStates);
|
||||
addRangeInfos(collectionStates.keySet());
|
||||
getShardLeaders();
|
||||
}
|
||||
|
||||
private void getShardLeaders() {
|
||||
Set<Entry<String,Map<String,Slice>>> collections = collectionStates.entrySet();
|
||||
for (Entry<String,Map<String,Slice>> collection : collections) {
|
||||
Map<String,Slice> state = collection.getValue();
|
||||
|
@ -85,27 +110,27 @@ public class ClusterState implements JSONWriter.Writable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get properties of a shard leader for specific collection.
|
||||
*/
|
||||
public ZkNodeProps getLeader(String collection, String shard) {
|
||||
Map<String,ZkNodeProps> collectionLeaders = leaders.get(collection);
|
||||
if (collectionLeaders == null) return null;
|
||||
return collectionLeaders.get(shard);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get shard properties or null if shard is not found.
|
||||
*/
|
||||
public ZkNodeProps getShardProps(final String collection, final String coreNodeName) {
|
||||
Map<String, Slice> slices = getSlices(collection);
|
||||
for(Slice slice: slices.values()) {
|
||||
if(slice.getShards().get(coreNodeName)!=null) {
|
||||
return slice.getShards().get(coreNodeName);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
/**
|
||||
* Get properties of a shard leader for specific collection.
|
||||
*/
|
||||
public ZkNodeProps getLeader(String collection, String shard) {
|
||||
Map<String,ZkNodeProps> collectionLeaders = leaders.get(collection);
|
||||
if (collectionLeaders == null) return null;
|
||||
return collectionLeaders.get(shard);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get shard properties or null if shard is not found.
|
||||
*/
|
||||
public ZkNodeProps getShardProps(final String collection, final String coreNodeName) {
|
||||
Map<String, Slice> slices = getSlices(collection);
|
||||
for(Slice slice: slices.values()) {
|
||||
if(slice.getShards().get(coreNodeName)!=null) {
|
||||
return slice.getShards().get(coreNodeName);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void addRangeInfos(Set<String> collections) {
|
||||
for (String collection : collections) {
|
||||
|
@ -117,72 +142,72 @@ public class ClusterState implements JSONWriter.Writable {
|
|||
* Get the index Slice for collection.
|
||||
*/
|
||||
public Slice getSlice(String collection, String slice) {
|
||||
if (collectionStates.containsKey(collection)
|
||||
&& collectionStates.get(collection).containsKey(slice))
|
||||
return collectionStates.get(collection).get(slice);
|
||||
return null;
|
||||
}
|
||||
if (collectionStates.containsKey(collection)
|
||||
&& collectionStates.get(collection).containsKey(slice))
|
||||
return collectionStates.get(collection).get(slice);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all slices for collection.
|
||||
*/
|
||||
public Map<String, Slice> getSlices(String collection) {
|
||||
if(!collectionStates.containsKey(collection))
|
||||
return null;
|
||||
return Collections.unmodifiableMap(collectionStates.get(collection));
|
||||
}
|
||||
public Map<String, Slice> getSlices(String collection) {
|
||||
if(!collectionStates.containsKey(collection))
|
||||
return null;
|
||||
return Collections.unmodifiableMap(collectionStates.get(collection));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get collection names.
|
||||
*/
|
||||
public Set<String> getCollections() {
|
||||
return Collections.unmodifiableSet(collectionStates.keySet());
|
||||
}
|
||||
/**
|
||||
* Get collection names.
|
||||
*/
|
||||
public Set<String> getCollections() {
|
||||
return Collections.unmodifiableSet(collectionStates.keySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Map<collectionName, Map<sliceName,Slice>>
|
||||
*/
|
||||
public Map<String, Map<String, Slice>> getCollectionStates() {
|
||||
return Collections.unmodifiableMap(collectionStates);
|
||||
}
|
||||
/**
|
||||
* @return Map<collectionName, Map<sliceName,Slice>>
|
||||
*/
|
||||
public Map<String, Map<String, Slice>> getCollectionStates() {
|
||||
return Collections.unmodifiableMap(collectionStates);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get names of the currently live nodes.
|
||||
*/
|
||||
public Set<String> getLiveNodes() {
|
||||
return Collections.unmodifiableSet(liveNodes);
|
||||
}
|
||||
/**
|
||||
* Get names of the currently live nodes.
|
||||
*/
|
||||
public Set<String> getLiveNodes() {
|
||||
return Collections.unmodifiableSet(liveNodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get shardId for core.
|
||||
* @param coreNodeName in the form of nodeName_coreName
|
||||
*/
|
||||
public String getShardId(String coreNodeName) {
|
||||
for (Entry<String, Map<String, Slice>> states: collectionStates.entrySet()){
|
||||
for(Entry<String, Slice> slices: states.getValue().entrySet()) {
|
||||
for(Entry<String, ZkNodeProps> shards: slices.getValue().getShards().entrySet()){
|
||||
if(coreNodeName.equals(shards.getKey())) {
|
||||
return slices.getKey();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
/**
|
||||
* Get shardId for core.
|
||||
* @param coreNodeName in the form of nodeName_coreName
|
||||
*/
|
||||
public String getShardId(String coreNodeName) {
|
||||
for (Entry<String, Map<String, Slice>> states: collectionStates.entrySet()){
|
||||
for(Entry<String, Slice> slices: states.getValue().entrySet()) {
|
||||
for(Entry<String, ZkNodeProps> shards: slices.getValue().getShards().entrySet()){
|
||||
if(coreNodeName.equals(shards.getKey())) {
|
||||
return slices.getKey();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if node is alive.
|
||||
*/
|
||||
public boolean liveNodesContain(String name) {
|
||||
return liveNodes.contains(name);
|
||||
}
|
||||
|
||||
public RangeInfo getRanges(String collection) {
|
||||
/**
|
||||
* Check if node is alive.
|
||||
*/
|
||||
public boolean liveNodesContain(String name) {
|
||||
return liveNodes.contains(name);
|
||||
}
|
||||
|
||||
public RangeInfo getRanges(String collection) {
|
||||
// TODO: store this in zk
|
||||
RangeInfo rangeInfo = rangeInfos.get(collection);
|
||||
|
||||
return rangeInfo;
|
||||
}
|
||||
return rangeInfo;
|
||||
}
|
||||
|
||||
private RangeInfo addRangeInfo(String collection) {
|
||||
List<Range> ranges;
|
||||
|
@ -227,29 +252,39 @@ public class ClusterState implements JSONWriter.Writable {
|
|||
throw new IllegalStateException("The HashPartitioner failed");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("live nodes:" + liveNodes);
|
||||
sb.append(" collections:" + collectionStates);
|
||||
return sb.toString();
|
||||
}
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("live nodes:" + liveNodes);
|
||||
sb.append(" collections:" + collectionStates);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create ClusterState by reading the current state from zookeeper.
|
||||
*/
|
||||
public static ClusterState load(SolrZkClient zkClient, Set<String> liveNodes) throws KeeperException, InterruptedException {
|
||||
/**
|
||||
* Create ClusterState by reading the current state from zookeeper.
|
||||
*/
|
||||
public static ClusterState load(SolrZkClient zkClient, Set<String> liveNodes) throws KeeperException, InterruptedException {
|
||||
Stat stat = new Stat();
|
||||
byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE,
|
||||
null, null, true);
|
||||
return load(state, liveNodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create ClusterState from json string that is typically stored in zookeeper.
|
||||
*/
|
||||
public static ClusterState load(byte[] bytes, Set<String> liveNodes) {
|
||||
null, stat, true);
|
||||
return load(stat.getVersion(), state, liveNodes);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create ClusterState from json string that is typically stored in zookeeper.
|
||||
*
|
||||
* Use {@link ClusterState#load(SolrZkClient, Set)} instead, unless you want to
|
||||
* do something more when getting the data - such as get the stat, set watch, etc.
|
||||
*
|
||||
* @param version zk version of the clusterstate.json file (bytes)
|
||||
* @param bytes clusterstate.json as a byte array
|
||||
* @param liveNodes list of live nodes
|
||||
* @return the ClusterState
|
||||
*/
|
||||
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
return new ClusterState(liveNodes, Collections.<String, Map<String,Slice>>emptyMap());
|
||||
return new ClusterState(version, liveNodes, Collections.<String, Map<String,Slice>>emptyMap());
|
||||
}
|
||||
|
||||
LinkedHashMap<String, Object> stateMap = (LinkedHashMap<String, Object>) ZkStateReader.fromJSON(bytes);
|
||||
|
@ -269,8 +304,8 @@ public class ClusterState implements JSONWriter.Writable {
|
|||
}
|
||||
state.put(collectionName, slices);
|
||||
}
|
||||
return new ClusterState(liveNodes, state);
|
||||
}
|
||||
return new ClusterState(version, liveNodes, state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(JSONWriter jsonWriter) {
|
||||
|
@ -282,5 +317,41 @@ public class ClusterState implements JSONWriter.Writable {
|
|||
private ArrayList<String> shardList;
|
||||
}
|
||||
|
||||
/**
|
||||
* The version of clusterstate.json in ZooKeeper.
|
||||
*
|
||||
* @return null if ClusterState was created for publication, not consumption
|
||||
*/
|
||||
public Integer getZkClusterStateVersion() {
|
||||
return zkClusterStateVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result
|
||||
+ ((zkClusterStateVersion == null) ? 0 : zkClusterStateVersion.hashCode());
|
||||
result = prime * result + ((liveNodes == null) ? 0 : liveNodes.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) return true;
|
||||
if (obj == null) return false;
|
||||
if (getClass() != obj.getClass()) return false;
|
||||
ClusterState other = (ClusterState) obj;
|
||||
if (zkClusterStateVersion == null) {
|
||||
if (other.zkClusterStateVersion != null) return false;
|
||||
} else if (!zkClusterStateVersion.equals(other.zkClusterStateVersion)) return false;
|
||||
if (liveNodes == null) {
|
||||
if (other.liveNodes != null) return false;
|
||||
} else if (!liveNodes.equals(other.liveNodes)) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.zookeeper.KeeperException;
|
|||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.Watcher.Event.EventType;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -193,10 +194,11 @@ public class ZkStateReader {
|
|||
synchronized (ZkStateReader.this.getUpdateLock()) {
|
||||
// remake watch
|
||||
final Watcher thisWatch = this;
|
||||
byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, null,
|
||||
Stat stat = new Stat();
|
||||
byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat ,
|
||||
true);
|
||||
|
||||
ClusterState clusterState = ClusterState.load(data,
|
||||
ClusterState clusterState = ClusterState.load(stat.getVersion(), data,
|
||||
ZkStateReader.this.clusterState.getLiveNodes());
|
||||
// update volatile
|
||||
ZkStateReader.this.clusterState = clusterState;
|
||||
|
@ -242,8 +244,10 @@ public class ZkStateReader {
|
|||
LIVE_NODES_ZKNODE, this, true);
|
||||
Set<String> liveNodesSet = new HashSet<String>();
|
||||
liveNodesSet.addAll(liveNodes);
|
||||
ClusterState clusterState = new ClusterState(liveNodesSet,
|
||||
ZkStateReader.this.clusterState.getCollectionStates());
|
||||
ClusterState clusterState = new ClusterState(
|
||||
ZkStateReader.this.clusterState.getZkClusterStateVersion(),
|
||||
liveNodesSet, ZkStateReader.this.clusterState
|
||||
.getCollectionStates());
|
||||
ZkStateReader.this.clusterState = clusterState;
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
|
@ -293,7 +297,8 @@ public class ZkStateReader {
|
|||
clusterState = ClusterState.load(zkClient, liveNodesSet);
|
||||
} else {
|
||||
log.info("Updating live nodes from ZooKeeper... ");
|
||||
clusterState = new ClusterState(liveNodesSet,
|
||||
clusterState = new ClusterState(
|
||||
ZkStateReader.this.clusterState.getZkClusterStateVersion(), liveNodesSet,
|
||||
ZkStateReader.this.clusterState.getCollectionStates());
|
||||
}
|
||||
}
|
||||
|
@ -325,7 +330,7 @@ public class ZkStateReader {
|
|||
clusterState = ClusterState.load(zkClient, liveNodesSet);
|
||||
} else {
|
||||
log.info("Updating live nodes from ZooKeeper... ");
|
||||
clusterState = new ClusterState(liveNodesSet, ZkStateReader.this.clusterState.getCollectionStates());
|
||||
clusterState = new ClusterState(ZkStateReader.this.clusterState .getZkClusterStateVersion(), liveNodesSet, ZkStateReader.this.clusterState.getCollectionStates());
|
||||
}
|
||||
|
||||
ZkStateReader.this.clusterState = clusterState;
|
||||
|
|
Loading…
Reference in New Issue