SOLR-5525

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1548503 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2013-12-06 12:58:35 +00:00
parent ad4b95a353
commit 0754f884a1
6 changed files with 88 additions and 45 deletions

View File

@ -46,6 +46,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonMap;
/**
* Cluster leader. Responsible node assignments, cluster state file?
*/
@ -611,11 +613,11 @@ public class Overseer {
List<DocRouter.Range> ranges = router.partitionRange(shards.size(), router.fullRange());
Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>();
// Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>();
Map<String, Slice> newSlices = new LinkedHashMap<String,Slice>();
newCollections.putAll(state.getCollectionStates());
// newCollections.putAll(state.getCollectionStates());
for (int i = 0; i < shards.size(); i++) {
String sliceName = shards.get(i);
/*}
@ -643,9 +645,10 @@ public class Overseer {
if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true");
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
newCollections.put(collectionName, newCollection);
ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newCollections);
return newClusterState;
// newCollections.put(collectionName, newCollection);
return state.copyWith(singletonMap(newCollection.getName(), newCollection));
// ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newCollections);
// return newClusterState;
}
/*
@ -771,6 +774,9 @@ public class Overseer {
newCollections.put(collectionName, newCollection);
return new ClusterState(state.getLiveNodes(), newCollections);
}
private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
return state.copyWith(colls);
}
/*
* Remove collection from cloudstate
@ -779,11 +785,11 @@ public class Overseer {
final String collection = message.getStr("name");
final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
newCollections.remove(collection);
// final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
// newCollections.remove(collection);
ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
return newState;
// ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
return clusterState.copyWith(singletonMap(collection, (DocCollection)null));
}
/*
@ -795,16 +801,17 @@ public class Overseer {
log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate");
final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
DocCollection coll = newCollections.get(collection);
// final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
DocCollection coll = clusterState.getCollection(collection);
Map<String, Slice> newSlices = new LinkedHashMap<String, Slice>(coll.getSlicesMap());
newSlices.remove(sliceId);
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
newCollections.put(newCollection.getName(), newCollection);
// newCollections.put(newCollection.getName(), newCollection);
return newState(clusterState, singletonMap(collection,newCollection));
return new ClusterState(clusterState.getLiveNodes(), newCollections);
// return new ClusterState(clusterState.getLiveNodes(), newCollections);
}
/*
@ -816,8 +823,9 @@ public class Overseer {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
DocCollection coll = newCollections.get(collection);
// final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
// DocCollection coll = newCollections.get(collection);
DocCollection coll = clusterState.getCollectionOrNull(collection) ;
if (coll == null) {
// TODO: log/error that we didn't find it?
// just in case, remove the zk collection node
@ -866,7 +874,7 @@ public class Overseer {
// if there are no slices left in the collection, remove it?
if (newSlices.size() == 0) {
newCollections.remove(coll.getName());
// newCollections.remove(coll.getName());
// TODO: it might be better logically to have this in ZkController
// but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
@ -879,15 +887,18 @@ public class Overseer {
} catch (KeeperException e) {
SolrException.log(log, "Problem cleaning up collection in zk:" + collection, e);
}
return newState(clusterState,singletonMap(collection, (DocCollection) null));
} else {
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
newCollections.put(newCollection.getName(), newCollection);
return newState(clusterState,singletonMap(collection,newCollection));
// newCollections.put(newCollection.getName(), newCollection);
}
ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
return newState;
// ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
// return newState;
}
@Override

View File

@ -58,7 +58,7 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
byte[] bytes = ZkStateReader.toJSON(clusterState);
// System.out.println("#################### " + new String(bytes));
ClusterState loadedClusterState = ClusterState.load(null, bytes, liveNodes);
ClusterState loadedClusterState = ClusterState.load(null, bytes, liveNodes,null);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());
@ -66,13 +66,13 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
assertEquals("Poperties not copied properly", replica.getStr("prop1"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop1"));
assertEquals("Poperties not copied properly", replica.getStr("prop2"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop2"));
loadedClusterState = ClusterState.load(null, new byte[0], liveNodes);
loadedClusterState = ClusterState.load(null, new byte[0], liveNodes,null);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());
assertEquals("Should not have collections", 0, loadedClusterState.getCollections().size());
loadedClusterState = ClusterState.load(null, (byte[])null, liveNodes);
loadedClusterState = ClusterState.load(null, (byte[])null, liveNodes,null);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());

View File

@ -51,7 +51,7 @@ public class SliceStateTest extends SolrTestCaseJ4 {
ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
byte[] bytes = ZkStateReader.toJSON(clusterState);
ClusterState loadedClusterState = ClusterState.load(null, bytes, liveNodes);
ClusterState loadedClusterState = ClusterState.load(null, bytes, liveNodes, null);
assertEquals("Default state not set to active", "active", loadedClusterState.getSlice("collection1", "shard1").getState());
}

View File

@ -37,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@ -142,8 +143,8 @@ public class SliceStateUpdateTest extends SolrTestCaseJ4 {
closeThread(updaterThread);
ClusterState clusterState = container1.getZkController().getClusterState();
Map<String, DocCollection> collectionStates =
new LinkedHashMap<String, DocCollection>(clusterState.getCollectionStates());
// Map<String, DocCollection> collectionStates =
// new LinkedHashMap<String, DocCollection>(clusterState.getCollectionStates());
Map<String, Slice> slicesMap = clusterState.getSlicesMap("collection1");
Map<String, Object> props = new HashMap<String, Object>(1);
@ -155,11 +156,11 @@ public class SliceStateUpdateTest extends SolrTestCaseJ4 {
props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name", ImplicitDocRouter.NAME));
DocCollection coll = new DocCollection("collection1", slicesMap, props, DocRouter.DEFAULT);
collectionStates.put("collection1", coll);
// collectionStates.put("collection1", coll);
SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(),
AbstractZkTestCase.TIMEOUT);
ClusterState newState = new ClusterState(clusterState.getLiveNodes(), collectionStates);
ClusterState newState = clusterState.copyWith(Collections.singletonMap(coll.getName(), coll) );
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(newState), true);
zkClient.close();

View File

@ -45,7 +45,9 @@ public class ClusterState implements JSONWriter.Writable {
private Integer zkClusterStateVersion;
private final Map<String, DocCollection> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
private final Set<String> liveNodes;
private Set<String> liveNodes;
private final ZkStateReader stateReader;
/**
* Use this constr when ClusterState is meant for publication.
@ -54,19 +56,43 @@ public class ClusterState implements JSONWriter.Writable {
*/
public ClusterState(Set<String> liveNodes,
Map<String, DocCollection> collectionStates) {
this(null, liveNodes, collectionStates);
this(null, liveNodes, collectionStates, null);
}
/**
* @deprecated
*/
public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes,
Map<String, DocCollection> collectionStates) {
this(zkClusterStateVersion, liveNodes, collectionStates,null);
}
/**
* Use this constr when ClusterState is meant for consumption.
*/
public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes,
Map<String, DocCollection> collectionStates) {
Map<String, DocCollection> collectionStates, ZkStateReader stateReader) {
this.zkClusterStateVersion = zkClusterStateVersion;
this.liveNodes = new HashSet<String>(liveNodes.size());
this.liveNodes.addAll(liveNodes);
this.collectionStates = new HashMap<String, DocCollection>(collectionStates.size());
this.collectionStates = new LinkedHashMap<String, DocCollection>(collectionStates.size());
this.collectionStates.putAll(collectionStates);
this.stateReader = stateReader;
}
public ClusterState copyWith(Map<String,DocCollection> modified){
ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates,stateReader);
for (Entry<String, DocCollection> e : modified.entrySet()) {
DocCollection c = e.getValue();
if(c == null) {
result.collectionStates.remove(e.getKey());
continue;
}
result.collectionStates.put(c.getName(), c);
}
return result;
}
@ -208,29 +234,28 @@ public class ClusterState implements JSONWriter.Writable {
/**
* Create ClusterState by reading the current state from zookeeper.
*/
public static ClusterState load(SolrZkClient zkClient, Set<String> liveNodes) throws KeeperException, InterruptedException {
public static ClusterState load(SolrZkClient zkClient, Set<String> liveNodes, ZkStateReader stateReader) throws KeeperException, InterruptedException {
Stat stat = new Stat();
byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE,
null, stat, true);
return load(stat.getVersion(), state, liveNodes);
return load(stat.getVersion(), state, liveNodes, stateReader);
}
/**
* Create ClusterState from json string that is typically stored in zookeeper.
*
* Use {@link ClusterState#load(SolrZkClient, Set)} instead, unless you want to
* Use {@link ClusterState#load(SolrZkClient, Set, ZkStateReader)} instead, unless you want to
* do something more when getting the data - such as get the stat, set watch, etc.
*
* @param version zk version of the clusterstate.json file (bytes)
* @param bytes clusterstate.json as a byte array
* @param liveNodes list of live nodes
* @return the ClusterState
*/
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes, ZkStateReader stateReader) {
// System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes)));
if (bytes == null || bytes.length == 0) {
return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap());
return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap(),stateReader);
}
Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
Map<String,DocCollection> collections = new LinkedHashMap<String,DocCollection>(stateMap.size());
@ -337,7 +362,11 @@ public class ClusterState implements JSONWriter.Writable {
return true;
}
/**Internal API used only by ZkStateReader
* @param liveNodes
*/
void setLiveNodes(Set<String> liveNodes){
this.liveNodes = liveNodes;
}
}

View File

@ -254,7 +254,7 @@ public class ZkStateReader {
byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat ,
true);
Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes();
ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln);
ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln,ZkStateReader.this);
// update volatile
ZkStateReader.this.clusterState = clusterState;
}
@ -326,7 +326,7 @@ public class ZkStateReader {
Set<String> liveNodeSet = new HashSet<String>();
liveNodeSet.addAll(liveNodes);
ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet);
ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this);
this.clusterState = clusterState;
zkClient.exists(ALIASES,
@ -393,12 +393,14 @@ public class ZkStateReader {
if (!onlyLiveNodes) {
log.info("Updating cloud state from ZooKeeper... ");
clusterState = ClusterState.load(zkClient, liveNodesSet);
clusterState = ClusterState.load(zkClient, liveNodesSet,this);
} else {
log.info("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size());
clusterState = new ClusterState(
clusterState = this.clusterState;
clusterState.setLiveNodes(liveNodesSet);
/*clusterState = new ClusterState(
ZkStateReader.this.clusterState.getZkClusterStateVersion(), liveNodesSet,
ZkStateReader.this.clusterState.getCollectionStates());
ZkStateReader.this.clusterState.getCollectionStates());*/
}
this.clusterState = clusterState;
}
@ -427,7 +429,7 @@ public class ZkStateReader {
if (!onlyLiveNodes) {
log.info("Updating cloud state from ZooKeeper... ");
clusterState = ClusterState.load(zkClient, liveNodesSet);
clusterState = ClusterState.load(zkClient, liveNodesSet,ZkStateReader.this);
} else {
log.info("Updating live nodes from ZooKeeper... ");
clusterState = new ClusterState(ZkStateReader.this.clusterState.getZkClusterStateVersion(), liveNodesSet, ZkStateReader.this.clusterState.getCollectionStates());