mirror of https://github.com/apache/lucene.git
SOLR-5811: Additional cleanup.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1574580 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3e2a817539
commit
e88091b3dd
|
@ -311,7 +311,7 @@ public class Overseer {
|
||||||
private ClusterState createReplica(ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState createReplica(ClusterState clusterState, ZkNodeProps message) {
|
||||||
log.info("createReplica() {} ", message);
|
log.info("createReplica() {} ", message);
|
||||||
String coll = message.getStr(ZkStateReader.COLLECTION_PROP);
|
String coll = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
checkCollection(message, coll);
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||||
Slice sl = clusterState.getSlice(coll, slice);
|
Slice sl = clusterState.getSlice(coll, slice);
|
||||||
if(sl == null){
|
if(sl == null){
|
||||||
|
@ -352,7 +352,7 @@ public class Overseer {
|
||||||
|
|
||||||
private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
|
||||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
checkCollection(message, collection);
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
log.info("Update shard state invoked for collection: " + collection + " with message: " + message);
|
log.info("Update shard state invoked for collection: " + collection + " with message: " + message);
|
||||||
for (String key : message.keySet()) {
|
for (String key : message.keySet()) {
|
||||||
if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
|
if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
|
||||||
|
@ -377,7 +377,7 @@ public class Overseer {
|
||||||
|
|
||||||
private ClusterState addRoutingRule(ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState addRoutingRule(ClusterState clusterState, ZkNodeProps message) {
|
||||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
checkCollection(message, collection);
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||||
String routeKey = message.getStr("routeKey");
|
String routeKey = message.getStr("routeKey");
|
||||||
String range = message.getStr("range");
|
String range = message.getStr("range");
|
||||||
|
@ -417,15 +417,22 @@ public class Overseer {
|
||||||
return clusterState;
|
return clusterState;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkCollection(ZkNodeProps message, String collection) {
|
private boolean checkCollectionKeyExistence(ZkNodeProps message) {
|
||||||
if (collection == null || collection.trim().length() == 0) {
|
return checkKeyExistence(message, ZkStateReader.COLLECTION_PROP);
|
||||||
log.error("Skipping invalid Overseer message because it has no collection specified: " + message);
|
}
|
||||||
|
|
||||||
|
private boolean checkKeyExistence(ZkNodeProps message, String key) {
|
||||||
|
String value = message.getStr(key);
|
||||||
|
if (value == null || value.trim().length() == 0) {
|
||||||
|
log.error("Skipping invalid Overseer message because it has no " + key + " specified: " + message);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterState removeRoutingRule(ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState removeRoutingRule(ClusterState clusterState, ZkNodeProps message) {
|
||||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
checkCollection(message, collection);
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||||
String routeKeyStr = message.getStr("routeKey");
|
String routeKeyStr = message.getStr("routeKey");
|
||||||
|
|
||||||
|
@ -451,7 +458,7 @@ public class Overseer {
|
||||||
|
|
||||||
private ClusterState createShard(ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState createShard(ClusterState clusterState, ZkNodeProps message) {
|
||||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
checkCollection(message, collection);
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||||
Slice slice = clusterState.getSlice(collection, shardId);
|
Slice slice = clusterState.getSlice(collection, shardId);
|
||||||
if (slice == null) {
|
if (slice == null) {
|
||||||
|
@ -498,7 +505,7 @@ public class Overseer {
|
||||||
|
|
||||||
private ClusterState updateStateNew(ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState updateStateNew(ClusterState clusterState, ZkNodeProps message) {
|
||||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
checkCollection(message, collection);
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||||
|
|
||||||
if(collection==null || sliceName == null){
|
if(collection==null || sliceName == null){
|
||||||
|
@ -517,30 +524,30 @@ public class Overseer {
|
||||||
/**
|
/**
|
||||||
* Try to assign core to the cluster.
|
* Try to assign core to the cluster.
|
||||||
*/
|
*/
|
||||||
private ClusterState updateState(ClusterState state, final ZkNodeProps message) {
|
private ClusterState updateState(ClusterState clusterState, final ZkNodeProps message) {
|
||||||
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
checkCollection(message, collection);
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
|
Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
|
||||||
log.info("Update state numShards={} message={}", numShards, message);
|
log.info("Update state numShards={} message={}", numShards, message);
|
||||||
|
|
||||||
List<String> shardNames = new ArrayList<String>();
|
List<String> shardNames = new ArrayList<String>();
|
||||||
|
|
||||||
//collection does not yet exist, create placeholders if num shards is specified
|
//collection does not yet exist, create placeholders if num shards is specified
|
||||||
boolean collectionExists = state.hasCollection(collection);
|
boolean collectionExists = clusterState.hasCollection(collection);
|
||||||
if (!collectionExists && numShards!=null) {
|
if (!collectionExists && numShards!=null) {
|
||||||
getShardNames(numShards, shardNames);
|
getShardNames(numShards, shardNames);
|
||||||
state = createCollection(state, collection, shardNames, message);
|
clusterState = createCollection(clusterState, collection, shardNames, message);
|
||||||
}
|
}
|
||||||
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||||
|
|
||||||
String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
|
String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
|
||||||
if (coreNodeName == null) {
|
if (coreNodeName == null) {
|
||||||
coreNodeName = getAssignedCoreNodeName(state, message);
|
coreNodeName = getAssignedCoreNodeName(clusterState, message);
|
||||||
if (coreNodeName != null) {
|
if (coreNodeName != null) {
|
||||||
log.info("node=" + coreNodeName + " is already registered");
|
log.info("node=" + coreNodeName + " is already registered");
|
||||||
} else {
|
} else {
|
||||||
// if coreNodeName is null, auto assign one
|
// if coreNodeName is null, auto assign one
|
||||||
coreNodeName = Assign.assignNode(collection, state);
|
coreNodeName = Assign.assignNode(collection, clusterState);
|
||||||
}
|
}
|
||||||
message.getProperties().put(ZkStateReader.CORE_NODE_NAME_PROP,
|
message.getProperties().put(ZkStateReader.CORE_NODE_NAME_PROP,
|
||||||
coreNodeName);
|
coreNodeName);
|
||||||
|
@ -549,7 +556,7 @@ public class Overseer {
|
||||||
// use the provided non null shardId
|
// use the provided non null shardId
|
||||||
if (sliceName == null) {
|
if (sliceName == null) {
|
||||||
//get shardId from ClusterState
|
//get shardId from ClusterState
|
||||||
sliceName = getAssignedId(state, coreNodeName, message);
|
sliceName = getAssignedId(clusterState, coreNodeName, message);
|
||||||
if (sliceName != null) {
|
if (sliceName != null) {
|
||||||
log.info("shard=" + sliceName + " is already registered");
|
log.info("shard=" + sliceName + " is already registered");
|
||||||
}
|
}
|
||||||
|
@ -558,14 +565,14 @@ public class Overseer {
|
||||||
//request new shardId
|
//request new shardId
|
||||||
if (collectionExists) {
|
if (collectionExists) {
|
||||||
// use existing numShards
|
// use existing numShards
|
||||||
numShards = state.getCollection(collection).getSlices().size();
|
numShards = clusterState.getCollection(collection).getSlices().size();
|
||||||
log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards);
|
log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards);
|
||||||
}
|
}
|
||||||
sliceName = Assign.assignShard(collection, state, numShards);
|
sliceName = Assign.assignShard(collection, clusterState, numShards);
|
||||||
log.info("Assigning new node to shard shard=" + sliceName);
|
log.info("Assigning new node to shard shard=" + sliceName);
|
||||||
}
|
}
|
||||||
|
|
||||||
Slice slice = state.getSlice(collection, sliceName);
|
Slice slice = clusterState.getSlice(collection, sliceName);
|
||||||
|
|
||||||
Map<String,Object> replicaProps = new LinkedHashMap<String,Object>();
|
Map<String,Object> replicaProps = new LinkedHashMap<String,Object>();
|
||||||
|
|
||||||
|
@ -611,9 +618,9 @@ public class Overseer {
|
||||||
Map<String,Replica> replicas;
|
Map<String,Replica> replicas;
|
||||||
|
|
||||||
if (slice != null) {
|
if (slice != null) {
|
||||||
state = checkAndCompleteShardSplit(state, collection, coreNodeName, sliceName, replicaProps);
|
clusterState = checkAndCompleteShardSplit(clusterState, collection, coreNodeName, sliceName, replicaProps);
|
||||||
// get the current slice again because it may have been updated due to checkAndCompleteShardSplit method
|
// get the current slice again because it may have been updated due to checkAndCompleteShardSplit method
|
||||||
slice = state.getSlice(collection, sliceName);
|
slice = clusterState.getSlice(collection, sliceName);
|
||||||
sliceProps = slice.getProperties();
|
sliceProps = slice.getProperties();
|
||||||
replicas = slice.getReplicasCopy();
|
replicas = slice.getReplicasCopy();
|
||||||
} else {
|
} else {
|
||||||
|
@ -627,7 +634,7 @@ public class Overseer {
|
||||||
replicas.put(replica.getName(), replica);
|
replicas.put(replica.getName(), replica);
|
||||||
slice = new Slice(sliceName, replicas, sliceProps);
|
slice = new Slice(sliceName, replicas, sliceProps);
|
||||||
|
|
||||||
ClusterState newClusterState = updateSlice(state, collection, slice);
|
ClusterState newClusterState = updateSlice(clusterState, collection, slice);
|
||||||
return newClusterState;
|
return newClusterState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -876,11 +883,9 @@ public class Overseer {
|
||||||
* Remove collection from cloudstate
|
* Remove collection from cloudstate
|
||||||
*/
|
*/
|
||||||
private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
|
||||||
|
|
||||||
final String collection = message.getStr("name");
|
final String collection = message.getStr("name");
|
||||||
checkCollection(message, collection);
|
if (!checkKeyExistence(message, "name")) return clusterState;
|
||||||
|
|
||||||
// ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
|
|
||||||
return clusterState.copyWith(singletonMap(collection, (DocCollection)null));
|
return clusterState.copyWith(singletonMap(collection, (DocCollection)null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -888,34 +893,28 @@ public class Overseer {
|
||||||
* Remove collection slice from cloudstate
|
* Remove collection slice from cloudstate
|
||||||
*/
|
*/
|
||||||
private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) {
|
||||||
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
|
||||||
checkCollection(message, collection);
|
|
||||||
final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||||
|
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
|
|
||||||
log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate");
|
log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate");
|
||||||
|
|
||||||
// final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
|
|
||||||
DocCollection coll = clusterState.getCollection(collection);
|
DocCollection coll = clusterState.getCollection(collection);
|
||||||
|
|
||||||
Map<String, Slice> newSlices = new LinkedHashMap<String, Slice>(coll.getSlicesMap());
|
Map<String, Slice> newSlices = new LinkedHashMap<String, Slice>(coll.getSlicesMap());
|
||||||
newSlices.remove(sliceId);
|
newSlices.remove(sliceId);
|
||||||
|
|
||||||
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
|
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
|
||||||
// newCollections.put(newCollection.getName(), newCollection);
|
|
||||||
return newState(clusterState, singletonMap(collection,newCollection));
|
return newState(clusterState, singletonMap(collection,newCollection));
|
||||||
|
|
||||||
// return new ClusterState(clusterState.getLiveNodes(), newCollections);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Remove core from cloudstate
|
* Remove core from cloudstate
|
||||||
*/
|
*/
|
||||||
private ClusterState removeCore(final ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState removeCore(final ClusterState clusterState, ZkNodeProps message) {
|
||||||
|
final String cnn = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
|
||||||
String cnn = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
|
|
||||||
|
|
||||||
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
checkCollection(message, collection);
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
|
|
||||||
// final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
|
// final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
|
||||||
// DocCollection coll = newCollections.get(collection);
|
// DocCollection coll = newCollections.get(collection);
|
||||||
|
|
|
@ -1013,7 +1013,8 @@ public final class ZkController {
|
||||||
core.close();
|
core.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("publishing core={} state={}", cd.getName(), state);
|
String collection = cd.getCloudDescriptor().getCollectionName();
|
||||||
|
log.info("publishing core={} state={} collection={}", cd.getName(), state, collection);
|
||||||
//System.out.println(Thread.currentThread().getStackTrace()[3]);
|
//System.out.println(Thread.currentThread().getStackTrace()[3]);
|
||||||
Integer numShards = cd.getCloudDescriptor().getNumShards();
|
Integer numShards = cd.getCloudDescriptor().getNumShards();
|
||||||
if (numShards == null) { //XXX sys prop hack
|
if (numShards == null) { //XXX sys prop hack
|
||||||
|
@ -1021,8 +1022,7 @@ public final class ZkController {
|
||||||
numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
|
numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert cd.getCloudDescriptor().getCollectionName() != null && cd.getCloudDescriptor()
|
assert collection != null && collection.length() > 0;
|
||||||
.getCollectionName().length() > 0;
|
|
||||||
|
|
||||||
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
|
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
|
||||||
//assert cd.getCloudDescriptor().getShardId() != null;
|
//assert cd.getCloudDescriptor().getShardId() != null;
|
||||||
|
@ -1033,12 +1033,9 @@ public final class ZkController {
|
||||||
ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles(),
|
ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles(),
|
||||||
ZkStateReader.NODE_NAME_PROP, getNodeName(),
|
ZkStateReader.NODE_NAME_PROP, getNodeName(),
|
||||||
ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId(),
|
ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId(),
|
||||||
ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor()
|
ZkStateReader.COLLECTION_PROP, collection,
|
||||||
.getCollectionName(),
|
ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString() : null,
|
||||||
ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString()
|
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName != null ? coreNodeName : null);
|
||||||
: null,
|
|
||||||
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName != null ? coreNodeName
|
|
||||||
: null);
|
|
||||||
if (updateLastState) {
|
if (updateLastState) {
|
||||||
cd.getCloudDescriptor().lastPublished = state;
|
cd.getCloudDescriptor().lastPublished = state;
|
||||||
}
|
}
|
||||||
|
@ -1368,7 +1365,6 @@ public final class ZkController {
|
||||||
|
|
||||||
CloudDescriptor cloudDesc = cd.getCloudDescriptor();
|
CloudDescriptor cloudDesc = cd.getCloudDescriptor();
|
||||||
|
|
||||||
|
|
||||||
// make sure the node name is set on the descriptor
|
// make sure the node name is set on the descriptor
|
||||||
if (cloudDesc.getCoreNodeName() == null) {
|
if (cloudDesc.getCoreNodeName() == null) {
|
||||||
cloudDesc.setCoreNodeName(coreNodeName);
|
cloudDesc.setCoreNodeName(coreNodeName);
|
||||||
|
|
Loading…
Reference in New Issue