SOLR-5473 one state.json per collection , SOLR-5474 support for one json per collection

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1607418 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2014-07-02 16:50:32 +00:00
parent e15ff59929
commit af18eeed89
20 changed files with 1034 additions and 184 deletions

View File

@ -70,6 +70,10 @@ New Features
* SOLR-6103: Added DateRangeField for indexing date ranges, especially * SOLR-6103: Added DateRangeField for indexing date ranges, especially
multi-valued ones. Based on LUCENE-5648. (David Smiley) multi-valued ones. Based on LUCENE-5648. (David Smiley)
* SOLR-5473: Make one state.json per collection (noble, shalin, Timothy Potter ,Jessica Cheng, Anshum Gupta, Mark Miller)
* SOLR-5474: Add stateFormat=2 support to CloudSolrServer (Timothy Potter , noble , Jessica Cheng)
Other Changes Other Changes
---------------------- ----------------------

View File

@ -263,7 +263,7 @@ sb.append("(group_name=").append(tg.getName()).append(")");
private Map<String,Object> getReplicaProps(ZkController zkController, SolrCore core) { private Map<String,Object> getReplicaProps(ZkController zkController, SolrCore core) {
final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName(); final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
Replica replica = zkController.getClusterState().getReplica(collection, core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName()); Replica replica = zkController.getClusterState().getReplica(collection, core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(),true);
if(replica!=null) { if(replica!=null) {
return replica.getProperties(); return replica.getProperties();
} }

View File

@ -33,6 +33,7 @@ import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -104,6 +105,8 @@ public class Overseer {
private Map clusterProps; private Map clusterProps;
private boolean isClosed = false; private boolean isClosed = false;
private final Map<String, Object> updateNodes = new ConcurrentHashMap<>();
private boolean isClusterStateModified = false;
public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) { public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
@ -117,6 +120,7 @@ public class Overseer {
this.myId = myId; this.myId = myId;
this.reader = reader; this.reader = reader;
clusterProps = reader.getClusterProps(); clusterProps = reader.getClusterProps();
reader.setEphemeralCollectionData(Collections.unmodifiableMap(updateNodes));
} }
public Stats getStateUpdateQueueStats() { public Stats getStateUpdateQueueStats() {
@ -257,6 +261,7 @@ public class Overseer {
stateUpdateQueue.poll(); stateUpdateQueue.poll();
if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break; if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break;
if(!updateNodes.isEmpty()) break;
// if an event comes in the next 100ms batch it together // if an event comes in the next 100ms batch it together
head = stateUpdateQueue.peek(100); head = stateUpdateQueue.peek(100);
} }
@ -296,8 +301,28 @@ public class Overseer {
TimerContext timerContext = stats.time("update_state"); TimerContext timerContext = stats.time("update_state");
boolean success = false; boolean success = false;
try { try {
zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true); if(!updateNodes.isEmpty()) {
lastUpdatedTime = System.nanoTime(); for (Entry<String, Object> e : updateNodes.entrySet()) {
if (e.getValue() == null) {
if (zkClient.exists(e.getKey(), true)) zkClient.delete(e.getKey(), 0, true);
} else {
if (zkClient.exists(e.getKey(), true)) {
log.info("going to update_collection {}", e.getKey());
zkClient.setData(e.getKey(), ZkStateReader.toJSON(e.getValue()), true);
} else {
log.info("going to create_collection {}", e.getValue());
zkClient.create(e.getKey(), ZkStateReader.toJSON(e.getValue()), CreateMode.PERSISTENT, true);
}
}
}
updateNodes.clear();
}
if(isClusterStateModified) {
lastUpdatedTime = System.nanoTime();
zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true);
isClusterStateModified = false;
}
success = true; success = true;
} finally { } finally {
timerContext.stop(); timerContext.stop();
@ -699,7 +724,7 @@ public class Overseer {
} }
Slice slice = clusterState.getSlice(collection, sliceName); Slice slice = clusterState.getSlice(collection, sliceName);
Map<String,Object> replicaProps = new LinkedHashMap<>(); Map<String,Object> replicaProps = new LinkedHashMap<>();
replicaProps.putAll(message.getProperties()); replicaProps.putAll(message.getProperties());
@ -717,7 +742,7 @@ public class Overseer {
replicaProps.remove(ZkStateReader.SHARD_ID_PROP); replicaProps.remove(ZkStateReader.SHARD_ID_PROP);
replicaProps.remove(ZkStateReader.COLLECTION_PROP); replicaProps.remove(ZkStateReader.COLLECTION_PROP);
replicaProps.remove(QUEUE_OPERATION); replicaProps.remove(QUEUE_OPERATION);
// remove any props with null values // remove any props with null values
Set<Entry<String,Object>> entrySet = replicaProps.entrySet(); Set<Entry<String,Object>> entrySet = replicaProps.entrySet();
List<String> removeKeys = new ArrayList<>(); List<String> removeKeys = new ArrayList<>();
@ -865,10 +890,18 @@ public class Overseer {
} }
collectionProps.put(DocCollection.DOC_ROUTER, routerSpec); collectionProps.put(DocCollection.DOC_ROUTER, routerSpec);
if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true"); if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true");
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router); String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null : ZkStateReader.getCollectionPath(collectionName);
return newState(state, singletonMap(newCollection.getName(), newCollection)); DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router , -1,znode);
isClusterStateModified = true;
log.info("state version {} {}", collectionName, newCollection.getStateFormat());
if (newCollection.getStateFormat() > 1) {
updateNodes.put(ZkStateReader.getCollectionPath(collectionName),
new ClusterState(-1, Collections.<String>emptySet(), singletonMap(newCollection.getName(), newCollection), state.getStateReader()));
return state;
} }
return newState(state, singletonMap(newCollection.getName(), newCollection));
}
/* /*
* Return an already assigned id or null if not assigned * Return an already assigned id or null if not assigned
@ -905,30 +938,27 @@ public class Overseer {
} }
return null; return null;
} }
private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) { private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
// System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates())); // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
// System.out.println("Updating slice:" + slice); // System.out.println("Updating slice:" + slice);
DocCollection newCollection = null;
DocCollection coll = state.getCollectionOrNull(collectionName) ; DocCollection coll = state.getCollectionOrNull(collectionName) ;
Map<String,Slice> slices; Map<String,Slice> slices;
Map<String,Object> props;
DocRouter router;
if (coll == null) { if (coll == null) {
// when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself // when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
// without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router. // without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
slices = new HashMap<>(1); slices = new LinkedHashMap<>(1);
props = new HashMap<>(1); slices.put(slice.getName(), slice);
Map<String,Object> props = new HashMap<>(1);
props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME)); props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME));
router = new ImplicitDocRouter(); newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
} else { } else {
props = coll.getProperties();
router = coll.getRouter();
slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy
slices.put(slice.getName(), slice);
newCollection = coll.copyWith(slices);
} }
slices.put(slice.getName(), slice);
DocCollection newCollection = new DocCollection(collectionName, slices, props, router);
// System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections)); // System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
@ -987,27 +1017,54 @@ public class Overseer {
} }
DocCollection newCollection = new DocCollection(coll.getName(), slices, coll.getProperties(), coll.getRouter()); DocCollection newCollection = coll.copyWith(slices);
return newState(state, singletonMap(collectionName, newCollection)); return newState(state, singletonMap(collectionName, newCollection));
} }
private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) { private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
return state.copyWith(colls); for (Entry<String, DocCollection> e : colls.entrySet()) {
} DocCollection c = e.getValue();
if (c == null) {
isClusterStateModified = true;
state = state.copyWith(singletonMap(e.getKey(), (DocCollection) null));
continue;
}
/* if (c.getStateFormat() >1) {
* Remove collection from cloudstate state.getStateReader().updateWatchedCollection(c);
*/ updateNodes.put(ZkStateReader.getCollectionPath(c.getName()), new ClusterState(-1, Collections.<String>emptySet(), singletonMap(c.getName(), c), state.getStateReader()));
private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) { } else {
final String collection = message.getStr("name"); isClusterStateModified = true;
if (!checkKeyExistence(message, "name")) return clusterState; state = state.copyWith(singletonMap(e.getKey(), c));
DocCollection coll = clusterState.getCollectionOrNull(collection); }
if(coll !=null) { }
return state;
}
/*
* Remove collection from cloudstate
*/
private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
final String collection = message.getStr("name");
if (!checkKeyExistence(message, "name")) return clusterState;
DocCollection coll = clusterState.getCollectionOrNull(collection);
if(coll !=null) {
isClusterStateModified = true;
if(coll.getStateFormat()>1){
try {
log.info("Deleting state for collection : {}", collection);
zkClient.delete(ZkStateReader.getCollectionPath(collection),-1,true);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"Unable to remove collection state :"+collection);
}
return clusterState;
} else{
return clusterState.copyWith(singletonMap(collection,(DocCollection)null)); return clusterState.copyWith(singletonMap(collection,(DocCollection)null));
} }
return clusterState;
} }
return clusterState;
}
/* /*
* Remove collection slice from cloudstate * Remove collection slice from cloudstate
*/ */
@ -1023,7 +1080,7 @@ public class Overseer {
Map<String, Slice> newSlices = new LinkedHashMap<>(coll.getSlicesMap()); Map<String, Slice> newSlices = new LinkedHashMap<>(coll.getSlicesMap());
newSlices.remove(sliceId); newSlices.remove(sliceId);
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter()); DocCollection newCollection = coll.copyWith(newSlices);
return newState(clusterState, singletonMap(collection,newCollection)); return newState(clusterState, singletonMap(collection,newCollection));
} }
@ -1035,8 +1092,6 @@ public class Overseer {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return clusterState; if (!checkCollectionKeyExistence(message)) return clusterState;
// final Map<String, DocCollection> newCollections = new LinkedHashMap<>(clusterState.getCollectionStates()); // shallow copy
// DocCollection coll = newCollections.get(collection);
DocCollection coll = clusterState.getCollectionOrNull(collection) ; DocCollection coll = clusterState.getCollectionOrNull(collection) ;
if (coll == null) { if (coll == null) {
// TODO: log/error that we didn't find it? // TODO: log/error that we didn't find it?
@ -1074,7 +1129,7 @@ public class Overseer {
newSlices.put(slice.getName(), slice); newSlices.put(slice.getName(), slice);
} }
} }
if (lastSlice) { if (lastSlice) {
// remove all empty pre allocated slices // remove all empty pre allocated slices
for (Slice slice : coll.getSlices()) { for (Slice slice : coll.getSlices()) {
@ -1091,7 +1146,7 @@ public class Overseer {
// but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
// ZkController out of the Overseer. // ZkController out of the Overseer.
try { try {
zkClient.clean("/collections/" + collection); zkClient.delete("/collections/" + collection,-1,true);
} catch (InterruptedException e) { } catch (InterruptedException e) {
SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e); SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -1101,8 +1156,8 @@ public class Overseer {
return newState(clusterState,singletonMap(collection, (DocCollection) null)); return newState(clusterState,singletonMap(collection, (DocCollection) null));
} else { } else {
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter()); DocCollection newCollection = coll.copyWith(newSlices);
return newState(clusterState,singletonMap(collection,newCollection)); return newState(clusterState,singletonMap(collection,newCollection));
} }
} }

View File

@ -707,7 +707,13 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
Set<String> collections = clusterState.getCollections(); Set<String> collections = clusterState.getCollections();
for (String name : collections) { for (String name : collections) {
Map<String, Object> collectionStatus = null; Map<String, Object> collectionStatus = null;
collectionStatus = getCollectionStatus((Map<String, Object>) stateMap.get(name), name, shard); if (clusterState.getCollection(name).getStateFormat()>1) {
bytes = ZkStateReader.toJSON(clusterState.getCollection(name));
Map<String, Object> docCollection = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
collectionStatus = getCollectionStatus(docCollection, name, shard);
} else {
collectionStatus = getCollectionStatus((Map<String, Object>) stateMap.get(name), name, shard);
}
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) { if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
collectionStatus.put("aliases", collectionVsAliases.get(name)); collectionStatus.put("aliases", collectionVsAliases.get(name));
} }
@ -716,8 +722,12 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
} else { } else {
String routeKey = message.getStr(ShardParams._ROUTE_); String routeKey = message.getStr(ShardParams._ROUTE_);
Map<String, Object> docCollection = null; Map<String, Object> docCollection = null;
if (clusterState.getCollection(collection).getStateFormat()>1 ) {
docCollection = (Map<String, Object>) stateMap.get(collection); bytes = ZkStateReader.toJSON(clusterState.getCollection(collection));
docCollection = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
} else {
docCollection = (Map<String, Object>) stateMap.get(collection);
}
if (routeKey == null) { if (routeKey == null) {
Map<String, Object> collectionStatus = getCollectionStatus(docCollection, collection, shard); Map<String, Object> collectionStatus = getCollectionStatus(docCollection, collection, shard);
if (collectionVsAliases.containsKey(collection) && !collectionVsAliases.get(collection).isEmpty()) { if (collectionVsAliases.containsKey(collection) && !collectionVsAliases.get(collection).isEmpty()) {

View File

@ -1109,6 +1109,16 @@ public final class ZkController {
} }
CloudDescriptor cloudDescriptor = cd.getCloudDescriptor(); CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
boolean removeWatch = true;
for (SolrCore solrCore : cc.getCores()) {//if there is no SolrCoe which is a member of this collection, remove the watch
CloudDescriptor cloudDesc = solrCore.getCoreDescriptor().getCloudDescriptor();
if (cloudDesc != null && cloudDescriptor.getCollectionName().equals(cloudDesc.getCollectionName())) {
//means
removeWatch = false;
break;
}
}
if(removeWatch) zkStateReader.removeZKWatch(collection);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.DELETECORE, ZkStateReader.CORE_NAME_PROP, coreName, Overseer.DELETECORE, ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, getNodeName(), ZkStateReader.NODE_NAME_PROP, getNodeName(),
@ -1411,6 +1421,10 @@ public final class ZkController {
publish(cd, ZkStateReader.DOWN, false, true); publish(cd, ZkStateReader.DOWN, false, true);
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName()); DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
if(collection !=null && collection.getStateFormat() >1 ){
log.info("Registering watch for collection {}",cd.getCloudDescriptor().getCollectionName());
zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName());
}
} catch (KeeperException e) { } catch (KeeperException e) {
log.error("", e); log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);

View File

@ -143,7 +143,7 @@ public class CollectionsHandler extends RequestHandlerBase {
if (action == null) { if (action == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: " + a); throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: " + a);
} }
switch (action) { switch (action) {
case CREATE: { case CREATE: {
this.handleCreateAction(req, rsp); this.handleCreateAction(req, rsp);
@ -320,36 +320,36 @@ public class CollectionsHandler extends RequestHandlerBase {
SolrQueryResponse rsp) throws KeeperException, InterruptedException { SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT); handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
} }
private void handleResponse(String operation, ZkNodeProps m, private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException { SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
long time = System.nanoTime(); long time = System.nanoTime();
if(m.containsKey(ASYNC) && m.get(ASYNC) != null) { if(m.containsKey(ASYNC) && m.get(ASYNC) != null) {
String asyncId = m.getStr(ASYNC); String asyncId = m.getStr(ASYNC);
if(asyncId.equals("-1")) { if(asyncId.equals("-1")) {
throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes."); throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
} }
NamedList<String> r = new NamedList<>(); NamedList<String> r = new NamedList<>();
if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) || if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) || coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) || coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
overseerCollectionQueueContains(asyncId)) { overseerCollectionQueueContains(asyncId)) {
r.add("error", "Task with the same requestid already exists."); r.add("error", "Task with the same requestid already exists.");
} else { } else {
coreContainer.getZkController().getOverseerCollectionQueue() coreContainer.getZkController().getOverseerCollectionQueue()
.offer(ZkStateReader.toJSON(m)); .offer(ZkStateReader.toJSON(m));
} }
r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC)); r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
SolrResponse response = new OverseerSolrResponse(r); SolrResponse response = new OverseerSolrResponse(r);
rsp.getValues().addAll(response.getResponse()); rsp.getValues().addAll(response.getResponse());
return; return;
} }
@ -380,27 +380,27 @@ public class CollectionsHandler extends RequestHandlerBase {
} }
} }
} }
private void handleReloadAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { private void handleReloadAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
log.info("Reloading Collection : " + req.getParamString()); log.info("Reloading Collection : " + req.getParamString());
String name = req.getParams().required().get("name"); String name = req.getParams().required().get("name");
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.RELOADCOLLECTION, "name", name); OverseerCollectionProcessor.RELOADCOLLECTION, "name", name);
handleResponse(OverseerCollectionProcessor.RELOADCOLLECTION, m, rsp); handleResponse(OverseerCollectionProcessor.RELOADCOLLECTION, m, rsp);
} }
private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException { private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException {
log.info("Syncing shard : " + req.getParamString()); log.info("Syncing shard : " + req.getParamString());
String collection = req.getParams().required().get("collection"); String collection = req.getParams().required().get("collection");
String shard = req.getParams().required().get("shard"); String shard = req.getParams().required().get("shard");
ClusterState clusterState = coreContainer.getZkController().getClusterState(); ClusterState clusterState = coreContainer.getZkController().getClusterState();
ZkNodeProps leaderProps = clusterState.getLeader(collection, shard); ZkNodeProps leaderProps = clusterState.getLeader(collection, shard);
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps); ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
HttpSolrServer server = new HttpSolrServer(nodeProps.getBaseUrl()); HttpSolrServer server = new HttpSolrServer(nodeProps.getBaseUrl());
try { try {
server.setConnectionTimeout(15000); server.setConnectionTimeout(15000);
@ -414,36 +414,36 @@ public class CollectionsHandler extends RequestHandlerBase {
server.shutdown(); server.shutdown();
} }
} }
private void handleCreateAliasAction(SolrQueryRequest req, private void handleCreateAliasAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws Exception { SolrQueryResponse rsp) throws Exception {
log.info("Create alias action : " + req.getParamString()); log.info("Create alias action : " + req.getParamString());
String name = req.getParams().required().get("name"); String name = req.getParams().required().get("name");
String collections = req.getParams().required().get("collections"); String collections = req.getParams().required().get("collections");
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.CREATEALIAS, "name", name, "collections", OverseerCollectionProcessor.CREATEALIAS, "name", name, "collections",
collections); collections);
handleResponse(OverseerCollectionProcessor.CREATEALIAS, m, rsp); handleResponse(OverseerCollectionProcessor.CREATEALIAS, m, rsp);
} }
private void handleDeleteAliasAction(SolrQueryRequest req, private void handleDeleteAliasAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws Exception { SolrQueryResponse rsp) throws Exception {
log.info("Delete alias action : " + req.getParamString()); log.info("Delete alias action : " + req.getParamString());
String name = req.getParams().required().get("name"); String name = req.getParams().required().get("name");
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.DELETEALIAS, "name", name); OverseerCollectionProcessor.DELETEALIAS, "name", name);
handleResponse(OverseerCollectionProcessor.DELETEALIAS, m, rsp); handleResponse(OverseerCollectionProcessor.DELETEALIAS, m, rsp);
} }
private void handleDeleteAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { private void handleDeleteAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
log.info("Deleting Collection : " + req.getParamString()); log.info("Deleting Collection : " + req.getParamString());
String name = req.getParams().required().get("name"); String name = req.getParams().required().get("name");
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.DELETECOLLECTION, "name", name); OverseerCollectionProcessor.DELETECOLLECTION, "name", name);
@ -464,7 +464,7 @@ public class CollectionsHandler extends RequestHandlerBase {
throw new SolrException(ErrorCode.BAD_REQUEST, throw new SolrException(ErrorCode.BAD_REQUEST,
"Collection name is required to create a new collection"); "Collection name is required to create a new collection");
} }
Map<String,Object> props = ZkNodeProps.makeMap( Map<String,Object> props = ZkNodeProps.makeMap(
Overseer.QUEUE_OPERATION, Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.CREATECOLLECTION, OverseerCollectionProcessor.CREATECOLLECTION,
@ -477,6 +477,7 @@ public class CollectionsHandler extends RequestHandlerBase {
MAX_SHARDS_PER_NODE, MAX_SHARDS_PER_NODE,
CREATE_NODE_SET , CREATE_NODE_SET ,
SHARDS_PROP, SHARDS_PROP,
DocCollection.STATE_FORMAT,
ASYNC, ASYNC,
"router."); "router.");
@ -553,7 +554,7 @@ public class CollectionsHandler extends RequestHandlerBase {
log.info("Deleting Shard : " + req.getParamString()); log.info("Deleting Shard : " + req.getParamString());
String name = req.getParams().required().get(ZkStateReader.COLLECTION_PROP); String name = req.getParams().required().get(ZkStateReader.COLLECTION_PROP);
String shard = req.getParams().required().get(ZkStateReader.SHARD_ID_PROP); String shard = req.getParams().required().get(ZkStateReader.SHARD_ID_PROP);
Map<String,Object> props = new HashMap<>(); Map<String,Object> props = new HashMap<>();
props.put(ZkStateReader.COLLECTION_PROP, name); props.put(ZkStateReader.COLLECTION_PROP, name);
props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.DELETESHARD); props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.DELETESHARD);

View File

@ -324,6 +324,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
String coreUrl = getRemotCoreUrl(cores, corename, origCorename); String coreUrl = getRemotCoreUrl(cores, corename, origCorename);
// don't proxy for internal update requests // don't proxy for internal update requests
SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString()); SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());
checkStateIsValid(cores, queryParams.get(CloudSolrServer.STATE_VERSION));
if (coreUrl != null if (coreUrl != null
&& queryParams && queryParams
.get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) { .get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) {
@ -379,6 +380,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
if( "/select".equals( path ) || "/select/".equals( path ) ) { if( "/select".equals( path ) || "/select/".equals( path ) ) {
solrReq = parser.parse( core, path, req ); solrReq = parser.parse( core, path, req );
checkStateIsValid(cores,solrReq.getParams().get(CloudSolrServer.STATE_VERSION));
String qt = solrReq.getParams().get( CommonParams.QT ); String qt = solrReq.getParams().get( CommonParams.QT );
handler = core.getRequestHandler( qt ); handler = core.getRequestHandler( qt );
if( handler == null ) { if( handler == null ) {
@ -468,6 +470,22 @@ public class SolrDispatchFilter extends BaseSolrFilter {
chain.doFilter(request, response); chain.doFilter(request, response);
} }
private void checkStateIsValid(CoreContainer cores, String stateVer) {
if(stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware() ){
// many have multiple collections separated by |
String[] pairs = StringUtils.split(stateVer, '|');
for (String pair : pairs) {
String[] pcs = StringUtils.split(pair, ':');
if(pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()){
Boolean status = cores.getZkController().getZkStateReader().checkValid(pcs[0],Integer.parseInt(pcs[1]));
if(Boolean.TRUE != status){
throw new SolrException(ErrorCode.INVALID_STATE, "STATE STALE: " + pair+ "valid : "+status);
}
}
}
}
}
private void processAliases(SolrQueryRequest solrReq, Aliases aliases, private void processAliases(SolrQueryRequest solrReq, Aliases aliases,
List<String> collectionsList) { List<String> collectionsList) {

View File

@ -17,6 +17,26 @@
package org.apache.solr.servlet; package org.apache.solr.servlet;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.FastWriter;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import org.noggit.ObjectBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
@ -29,30 +49,6 @@ import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.lucene.util.BytesRef;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import org.noggit.ObjectBuilder;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.FastWriter;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Zookeeper Info * Zookeeper Info
@ -90,7 +86,6 @@ public final class ZookeeperInfoServlet extends HttpServlet {
String path = params.get("path"); String path = params.get("path");
String addr = params.get("addr"); String addr = params.get("addr");
boolean all = "true".equals(params.get("all"));
if (addr != null && addr.length() == 0) { if (addr != null && addr.length() == 0) {
addr = null; addr = null;
@ -110,6 +105,7 @@ public final class ZookeeperInfoServlet extends HttpServlet {
ZKPrinter printer = new ZKPrinter(response, out, cores.getZkController(), addr); ZKPrinter printer = new ZKPrinter(response, out, cores.getZkController(), addr);
printer.detail = detail; printer.detail = detail;
printer.dump = dump; printer.dump = dump;
printer.isTreeView = (params.get("wt") == null); // this is hacky but tree view requests don't come in with the wt set
try { try {
printer.print(path); printer.print(path);
@ -140,6 +136,8 @@ public final class ZookeeperInfoServlet extends HttpServlet {
boolean detail = false; boolean detail = false;
boolean dump = false; boolean dump = false;
boolean isTreeView = false;
String addr; // the address passed to us String addr; // the address passed to us
String keeperAddr; // the address we're connected to String keeperAddr; // the address we're connected to
@ -385,6 +383,47 @@ public final class ZookeeperInfoServlet extends HttpServlet {
dataStrErr = "data is not parsable as a utf8 String: " + e.toString(); dataStrErr = "data is not parsable as a utf8 String: " + e.toString();
} }
} }
// pull in external collections too
if (ZkStateReader.CLUSTER_STATE.equals(path) && !isTreeView) {
SortedMap<String,Object> collectionStates = null;
List<String> children = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, null, true);
java.util.Collections.sort(children);
for (String collection : children) {
String collStatePath = ZkStateReader.getCollectionPath(collection);
String childDataStr = null;
try {
byte[] childData = zkClient.getData(collStatePath, null, null, true);
if (childData != null) {
childDataStr = (new BytesRef(childData)).utf8ToString();
}
} catch (KeeperException.NoNodeException nne) {
// safe to ignore
} catch (Exception childErr) {
log.error("Failed to get "+collStatePath+" due to: "+childErr);
}
if (childDataStr != null) {
if (collectionStates == null) {
// initialize lazily as there may not be any external collections
collectionStates = new TreeMap<>();
// add the internal collections
if (dataStr != null)
collectionStates.putAll((Map<String,Object>)ObjectBuilder.fromJSON(dataStr));
}
// now add in the external collections
Map<String,Object> extColl = (Map<String,Object>)ObjectBuilder.fromJSON(childDataStr);
collectionStates.put(collection, extColl.get(collection));
}
}
if (collectionStates != null) {
CharArr out = new CharArr();
new JSONWriter(out, 2).write(collectionStates);
dataStr = out.toString();
}
}
json.writeString("znode"); json.writeString("znode");
json.writeNameSeparator(); json.writeNameSeparator();

View File

@ -234,7 +234,7 @@ public class SolrLogLayout extends Layout {
private Map<String,Object> getReplicaProps(ZkController zkController, SolrCore core) { private Map<String,Object> getReplicaProps(ZkController zkController, SolrCore core) {
final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName(); final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
Replica replica = zkController.getClusterState().getReplica(collection, zkController.getCoreNodeName(core.getCoreDescriptor())); Replica replica = zkController.getZkStateReader().getClusterState(). getReplica(collection, zkController.getCoreNodeName(core.getCoreDescriptor()), true);
if(replica!=null) { if(replica!=null) {
return replica.getProperties(); return replica.getProperties();
} }

View File

@ -86,7 +86,7 @@ public class AssignTest extends SolrTestCaseJ4 {
collectionStates.put(cname, docCollection); collectionStates.put(cname, docCollection);
Set<String> liveNodes = new HashSet<>(); Set<String> liveNodes = new HashSet<>();
ClusterState state = new ClusterState(-1,liveNodes, collectionStates); ClusterState state = new ClusterState(-1,liveNodes, collectionStates, ClusterStateTest.getMockZkStateReader(collectionStates.keySet()));
String nodeName = Assign.assignNode("collection1", state); String nodeName = Assign.assignNode("collection1", state);
assertEquals("core_node2", nodeName); assertEquals("core_node2", nodeName);

View File

@ -62,10 +62,10 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
collectionStates.put("collection2", new DocCollection("collection2", slices, null, DocRouter.DEFAULT)); collectionStates.put("collection2", new DocCollection("collection2", slices, null, DocRouter.DEFAULT));
ZkStateReader zkStateReaderMock = getMockZkStateReader(collectionStates.keySet()); ZkStateReader zkStateReaderMock = getMockZkStateReader(collectionStates.keySet());
ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates); ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates,zkStateReaderMock);
byte[] bytes = ZkStateReader.toJSON(clusterState); byte[] bytes = ZkStateReader.toJSON(clusterState);
// System.out.println("#################### " + new String(bytes)); // System.out.println("#################### " + new String(bytes));
ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes); ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes,zkStateReaderMock,null);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size()); .getLiveNodes().size());
@ -73,13 +73,13 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
assertEquals("Poperties not copied properly", replica.getStr("prop1"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop1")); assertEquals("Poperties not copied properly", replica.getStr("prop1"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop1"));
assertEquals("Poperties not copied properly", replica.getStr("prop2"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop2")); assertEquals("Poperties not copied properly", replica.getStr("prop2"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop2"));
loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes); loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes, getMockZkStateReader(Collections.<String>emptySet()),null );
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size()); .getLiveNodes().size());
assertEquals("Should not have collections", 0, loadedClusterState.getCollections().size()); assertEquals("Should not have collections", 0, loadedClusterState.getCollections().size());
loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes); loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes,getMockZkStateReader(Collections.<String>emptySet()),null);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size()); .getLiveNodes().size());
@ -89,6 +89,13 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
public static ZkStateReader getMockZkStateReader(final Set<String> collections) { public static ZkStateReader getMockZkStateReader(final Set<String> collections) {
ZkStateReader mock = createMock(ZkStateReader.class); ZkStateReader mock = createMock(ZkStateReader.class);
EasyMock.reset(mock); EasyMock.reset(mock);
mock.getAllCollections();
EasyMock.expectLastCall().andAnswer(new IAnswer<Set<String>>() {
@Override
public Set<String> answer() throws Throwable {
return collections;
}
}).anyTimes();
EasyMock.replay(mock); EasyMock.replay(mock);
return mock; return mock;

View File

@ -0,0 +1,119 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
public class ExternalCollectionsTest extends AbstractFullDistribZkTestBase {
private CloudSolrServer client;
@BeforeClass
public static void beforeThisClass2() throws Exception {
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
System.setProperty("numShards", Integer.toString(sliceCount));
System.setProperty("solr.xml.persist", "true");
client = createCloudClient(null);
}
@After
public void tearDown() throws Exception {
super.tearDown();
client.shutdown();
}
protected String getSolrXml() {
return "solr-no-core.xml";
}
public ExternalCollectionsTest() {
fixShardCount = true;
sliceCount = 2;
shardCount = 4;
checkCreatedVsState = false;
}
@Override
public void doTest() throws Exception {
testZkNodeLocation();
}
boolean externalColl = false;
@Override
protected int getStateFormat() {
return externalColl ? 2:1;
}
private void testZkNodeLocation() throws Exception{
externalColl=true;
String collectionName = "myExternColl";
createCollection(collectionName, client, 2, 2);
waitForRecoveriesToFinish(collectionName, false);
assertTrue("does not exist collection state externally",
cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
Stat stat = new Stat();
byte[] data = cloudClient.getZkStateReader().getZkClient().getData(ZkStateReader.getCollectionPath(collectionName), null, stat, true);
DocCollection c = ZkStateReader.getCollectionLive(cloudClient.getZkStateReader(), collectionName);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
assertEquals("The zkversion of the nodes must be same zkver:" + stat.getVersion() , stat.getVersion(),clusterState.getCollection(collectionName).getZNodeVersion() );
assertTrue("DocCllection#getStateFormat() must be > 1", cloudClient.getZkStateReader().getClusterState().getCollection(collectionName).getStateFormat() > 1);
// remove collection
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.DELETE.toString());
params.set("name", collectionName);
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
if (client == null) {
client = createCloudClient(null);
}
client.request(request);
checkForMissingCollection(collectionName);
assertFalse("collection state should not exist externally", cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
}
}

View File

@ -50,9 +50,9 @@ public class SliceStateTest extends SolrTestCaseJ4 {
collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT)); collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT));
ZkStateReader mockZkStateReader = ClusterStateTest.getMockZkStateReader(collectionStates.keySet()); ZkStateReader mockZkStateReader = ClusterStateTest.getMockZkStateReader(collectionStates.keySet());
ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates); ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates, mockZkStateReader);
byte[] bytes = ZkStateReader.toJSON(clusterState); byte[] bytes = ZkStateReader.toJSON(clusterState);
ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes); ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes, mockZkStateReader,null);
assertEquals("Default state not set to active", "active", loadedClusterState.getSlice("collection1", "shard1").getState()); assertEquals("Default state not set to active", "active", loadedClusterState.getSlice("collection1", "shard1").getState());
} }

View File

@ -18,7 +18,9 @@ package org.apache.solr.client.solrj.impl;
*/ */
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.SocketException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -30,13 +32,16 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.solr.client.solrj.ResponseParser; import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServer;
@ -67,6 +72,8 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* SolrJ client class to communicate with SolrCloud. * SolrJ client class to communicate with SolrCloud.
@ -79,6 +86,8 @@ import org.apache.zookeeper.KeeperException;
* with {@link #setIdField(String)}. * with {@link #setIdField(String)}.
*/ */
public class CloudSolrServer extends SolrServer { public class CloudSolrServer extends SolrServer {
private static final Logger log = LoggerFactory.getLogger(CloudSolrServer.class);
private volatile ZkStateReader zkStateReader; private volatile ZkStateReader zkStateReader;
private String zkHost; // the zk server address private String zkHost; // the zk server address
private int zkConnectTimeout = 10000; private int zkConnectTimeout = 10000;
@ -87,6 +96,8 @@ public class CloudSolrServer extends SolrServer {
private final LBHttpSolrServer lbServer; private final LBHttpSolrServer lbServer;
private final boolean shutdownLBHttpSolrServer; private final boolean shutdownLBHttpSolrServer;
private HttpClient myClient; private HttpClient myClient;
//no of times collection state to be reloaded if stale state error is received
private static final int MAX_STALE_RETRIES = 5;
Random rand = new Random(); Random rand = new Random();
private final boolean updatesToLeaders; private final boolean updatesToLeaders;
@ -95,6 +106,7 @@ public class CloudSolrServer extends SolrServer {
.newCachedThreadPool(new SolrjNamedThreadFactory( .newCachedThreadPool(new SolrjNamedThreadFactory(
"CloudSolrServer ThreadPool")); "CloudSolrServer ThreadPool"));
private String idField = "id"; private String idField = "id";
public static final String STATE_VERSION = "_stateVer_";
private final Set<String> NON_ROUTABLE_PARAMS; private final Set<String> NON_ROUTABLE_PARAMS;
{ {
NON_ROUTABLE_PARAMS = new HashSet<>(); NON_ROUTABLE_PARAMS = new HashSet<>();
@ -112,8 +124,36 @@ public class CloudSolrServer extends SolrServer {
// NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK); // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
} }
private volatile long timeToLive = 60* 1000L;
protected Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){
@Override
public ExpiringCachedDocCollection get(Object key) {
ExpiringCachedDocCollection val = super.get(key);
if(val == null) return null;
if(val.isExpired(timeToLive)) {
super.remove(key);
return null;
}
return val;
}
};
class ExpiringCachedDocCollection {
DocCollection cached;
long cachedAt;
ExpiringCachedDocCollection(DocCollection cached) {
this.cached = cached;
this.cachedAt = System.currentTimeMillis();
}
boolean isExpired(long timeToLive) {
return (System.currentTimeMillis() - cachedAt) > timeToLive;
}
}
/** /**
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state, * @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
@ -127,6 +167,8 @@ public class CloudSolrServer extends SolrServer {
this.lbServer.setParser(new BinaryResponseParser()); this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = true; this.updatesToLeaders = true;
shutdownLBHttpSolrServer = true; shutdownLBHttpSolrServer = true;
setupStateVerParamOnQueryString(lbServer);
} }
public CloudSolrServer(String zkHost, boolean updatesToLeaders) public CloudSolrServer(String zkHost, boolean updatesToLeaders)
@ -138,6 +180,15 @@ public class CloudSolrServer extends SolrServer {
this.lbServer.setParser(new BinaryResponseParser()); this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = updatesToLeaders; this.updatesToLeaders = updatesToLeaders;
shutdownLBHttpSolrServer = true; shutdownLBHttpSolrServer = true;
setupStateVerParamOnQueryString(lbServer);
}
/**Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
* @param seconds ttl value in seconds
*/
public void setCollectionCacheTTl(int seconds){
assert seconds > 0;
timeToLive = seconds*1000L;
} }
/** /**
@ -150,6 +201,7 @@ public class CloudSolrServer extends SolrServer {
this.lbServer = lbServer; this.lbServer = lbServer;
this.updatesToLeaders = true; this.updatesToLeaders = true;
shutdownLBHttpSolrServer = false; shutdownLBHttpSolrServer = false;
setupStateVerParamOnQueryString(lbServer);
} }
/** /**
@ -163,8 +215,24 @@ public class CloudSolrServer extends SolrServer {
this.lbServer = lbServer; this.lbServer = lbServer;
this.updatesToLeaders = updatesToLeaders; this.updatesToLeaders = updatesToLeaders;
shutdownLBHttpSolrServer = false; shutdownLBHttpSolrServer = false;
setupStateVerParamOnQueryString(lbServer);
} }
/**
* Used internally to setup the _stateVer_ param to be sent in the query string of requests
* coming from this instance.
*/
protected void setupStateVerParamOnQueryString(LBHttpSolrServer lbServer) {
// setup the stateVer param to be passed in the query string of every request
Set<String> queryStringParams = lbServer.getQueryParams();
if (queryStringParams == null) {
queryStringParams = new HashSet<String>(2);
lbServer.setQueryParams(queryStringParams);
}
queryStringParams.add(STATE_VERSION);
}
public ResponseParser getParser() { public ResponseParser getParser() {
return lbServer.getParser(); return lbServer.getParser();
} }
@ -238,8 +306,7 @@ public class CloudSolrServer extends SolrServer {
if (zkStateReader == null) { if (zkStateReader == null) {
ZkStateReader zk = null; ZkStateReader zk = null;
try { try {
zk = new ZkStateReader(zkHost, zkClientTimeout, zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
zkConnectTimeout);
zk.createClusterStateWatchersAndUpdate(); zk.createClusterStateWatchersAndUpdate();
zkStateReader = zk; zkStateReader = zk;
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -302,7 +369,7 @@ public class CloudSolrServer extends SolrServer {
} }
} }
DocCollection col = clusterState.getCollection(collection); DocCollection col = getDocCollection(clusterState, collection);
DocRouter router = col.getRouter(); DocRouter router = col.getRouter();
@ -519,7 +586,146 @@ public class CloudSolrServer extends SolrServer {
} }
@Override @Override
public NamedList<Object> request(SolrRequest request) public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
SolrParams reqParams = request.getParams();
String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection();
return requestWithRetryOnStaleState(request, 0, collection);
}
/**
* As this class doesn't watch external collections on the client side,
* there's a chance that the request will fail due to cached stale state,
* which means the state must be refreshed from ZK and retried.
*/
protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection)
throws SolrServerException, IOException {
connect(); // important to call this before you start working with the ZkStateReader
// build up a _stateVer_ param to pass to the server containing all of the
// external collection state versions involved in this request, which allows
// the server to notify us that our cached state for one or more of the external
// collections is stale and needs to be refreshed ... this code has no impact on internal collections
String stateVerParam = null;
List<DocCollection> requestedCollections = null;
if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests
Set<String> requestedCollectionNames = getCollectionList(getZkStateReader().getClusterState(), collection);
StringBuilder stateVerParamBuilder = null;
for (String requestedCollection : requestedCollectionNames) {
// track the version of state we're using on the client side using the _stateVer_ param
DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection);
int collVer = coll.getZNodeVersion();
if (coll.getStateFormat()>1) {
if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
requestedCollections.add(coll);
if (stateVerParamBuilder == null) {
stateVerParamBuilder = new StringBuilder();
} else {
stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
}
stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
}
}
if (stateVerParamBuilder != null) {
stateVerParam = stateVerParamBuilder.toString();
}
}
if (request.getParams() instanceof ModifiableSolrParams) {
ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
if (stateVerParam != null) {
params.set(STATE_VERSION, stateVerParam);
} else {
params.remove(STATE_VERSION);
}
} // else: ??? how to set this ???
NamedList<Object> resp = null;
try {
resp = sendRequest(request);
} catch (Exception exc) {
Throwable rootCause = SolrException.getRootCause(exc);
// don't do retry support for admin requests or if the request doesn't have a collection specified
if (collection == null || request.getPath().startsWith("/admin")) {
if (exc instanceof SolrServerException) {
throw (SolrServerException)exc;
} else if (exc instanceof IOException) {
throw (IOException)exc;
}else if (exc instanceof RuntimeException) {
throw (RuntimeException) exc;
}
else {
throw new SolrServerException(rootCause);
}
}
int errorCode = (rootCause instanceof SolrException) ?
((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
log.error("Request to collection {} failed due to ("+errorCode+
") {}, retry? "+retryCount, collection, rootCause.toString());
boolean wasCommError =
(rootCause instanceof ConnectException ||
rootCause instanceof ConnectTimeoutException ||
rootCause instanceof NoHttpResponseException ||
rootCause instanceof SocketException);
boolean stateWasStale = false;
if (retryCount < MAX_STALE_RETRIES &&
!requestedCollections.isEmpty() &&
SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
{
// cached state for one or more external collections was stale
// re-issue request using updated state
stateWasStale = true;
// just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
for (DocCollection ext : requestedCollections) {
collectionStateCache.remove(ext.getName());
}
}
// if we experienced a communication error, it's worth checking the state
// with ZK just to make sure the node we're trying to hit is still part of the collection
if (retryCount < MAX_STALE_RETRIES && !stateWasStale && !requestedCollections.isEmpty() && wasCommError) {
for (DocCollection ext : requestedCollections) {
DocCollection latestStateFromZk = getZkStateReader().getCollection(ext.getName());
if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
// looks like we couldn't reach the server because the state was stale == retry
stateWasStale = true;
// we just pulled state from ZK, so update the cache so that the retry uses it
collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
}
}
}
requestedCollections.clear(); // done with this
// if the state was stale, then we retry the request once with new state pulled from Zk
if (stateWasStale) {
log.warn("Re-trying request to collection(s) "+collection+" after stale state error from server.");
resp = requestWithRetryOnStaleState(request, retryCount+1, collection);
} else {
if (exc instanceof SolrServerException) {
throw (SolrServerException)exc;
} else if (exc instanceof IOException) {
throw (IOException)exc;
} else {
throw new SolrServerException(rootCause);
}
}
}
return resp;
}
protected NamedList<Object> sendRequest(SolrRequest request)
throws SolrServerException, IOException { throws SolrServerException, IOException {
connect(); connect();
@ -579,7 +785,7 @@ public class CloudSolrServer extends SolrServer {
// add it to the Map of slices. // add it to the Map of slices.
Map<String,Slice> slices = new HashMap<>(); Map<String,Slice> slices = new HashMap<>();
for (String collectionName : collectionsList) { for (String collectionName : collectionsList) {
DocCollection col = clusterState.getCollection(collectionName); DocCollection col = getDocCollection(clusterState, collectionName);
Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col); Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
ClientUtils.addSlices(slices, collectionName, routeSlices, true); ClientUtils.addSlices(slices, collectionName, routeSlices, true);
} }
@ -671,14 +877,17 @@ public class CloudSolrServer extends SolrServer {
Aliases aliases = zkStateReader.getAliases(); Aliases aliases = zkStateReader.getAliases();
String alias = aliases.getCollectionAlias(collectionName); String alias = aliases.getCollectionAlias(collectionName);
if (alias != null) { if (alias != null) {
List<String> aliasList = StrUtils.splitSmart(alias, ",", true); List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
collectionsList.addAll(aliasList); collectionsList.addAll(aliasList);
continue; continue;
} }
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName); DocCollection docCollection = getDocCollection(clusterState, collection);
if (docCollection == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
}
} }
collectionsList.add(collectionName); collectionsList.add(collectionName);
} }
return collectionsList; return collectionsList;
@ -715,6 +924,28 @@ public class CloudSolrServer extends SolrServer {
return updatesToLeaders; return updatesToLeaders;
} }
protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException {
ExpiringCachedDocCollection cachedState = collectionStateCache != null ? collectionStateCache.get(collection) : null;
if (cachedState != null && cachedState.cached != null) {
return cachedState.cached;
}
DocCollection col = clusterState.getCollectionOrNull(collection);
if(col == null ) return null;
collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
return col;
}
/**
* Extension point to allow sub-classes to override the ZkStateReader this class uses internally.
*/
protected ZkStateReader createZkStateReader(String zkHost, int zkClientTimeout, int zkConnectTimeout)
throws InterruptedException, TimeoutException, IOException, KeeperException {
ZkStateReader zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
zk.createClusterStateWatchersAndUpdate();
return zk;
}
/** /**
* Useful for determining the minimum achieved replication factor across * Useful for determining the minimum achieved replication factor across
* all shards involved in processing an update request, typically useful * all shards involved in processing an update request, typically useful

View File

@ -46,36 +46,25 @@ public class ClusterState implements JSONWriter.Writable {
private final Map<String, DocCollection> collectionStates; // Map<collectionName, Map<sliceName,Slice>> private final Map<String, DocCollection> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
private Set<String> liveNodes; private Set<String> liveNodes;
private final ZkStateReader stateReader;
/**
* Use this constr when ClusterState is meant for publication.
*
* hashCode and equals will only depend on liveNodes and not clusterStateVersion.
*/
@Deprecated
public ClusterState(Set<String> liveNodes,
Map<String, DocCollection> collectionStates) {
this(null, liveNodes, collectionStates);
}
/** /**
* Use this constr when ClusterState is meant for consumption. * Use this constr when ClusterState is meant for consumption.
*/ */
public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes, public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes,
Map<String, DocCollection> collectionStates) { Map<String, DocCollection> collectionStates, ZkStateReader stateReader) {
assert stateReader != null;
this.zkClusterStateVersion = zkClusterStateVersion; this.zkClusterStateVersion = zkClusterStateVersion;
this.liveNodes = new HashSet<>(liveNodes.size()); this.liveNodes = new HashSet<>(liveNodes.size());
this.liveNodes.addAll(liveNodes); this.liveNodes.addAll(liveNodes);
this.collectionStates = new LinkedHashMap<>(collectionStates.size()); this.collectionStates = new LinkedHashMap<>(collectionStates.size());
this.collectionStates.putAll(collectionStates); this.collectionStates.putAll(collectionStates);
this.stateReader = stateReader;
} }
public ClusterState copyWith(Map<String,DocCollection> modified){ public ClusterState copyWith(Map<String,DocCollection> modified){
ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates); ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates,stateReader);
for (Entry<String, DocCollection> e : modified.entrySet()) { for (Entry<String, DocCollection> e : modified.entrySet()) {
DocCollection c = e.getValue(); DocCollection c = e.getValue();
if(c == null) { if(c == null) {
@ -98,17 +87,10 @@ public class ClusterState implements JSONWriter.Writable {
if (slice == null) return null; if (slice == null) return null;
return slice.getLeader(); return slice.getLeader();
} }
private Replica getReplica(DocCollection coll, String replicaName) {
if (coll == null) return null;
for (Slice slice : coll.getSlices()) {
Replica replica = slice.getReplica(replicaName);
if (replica != null) return replica;
}
return null;
}
public boolean hasCollection(String coll) { public boolean hasCollection(String coll) {
return collectionStates.containsKey(coll) ; if (collectionStates.containsKey(coll)) return true;
return stateReader.getAllCollections().contains(coll);
} }
/** /**
@ -116,8 +98,9 @@ public class ClusterState implements JSONWriter.Writable {
* If the slice is known, do not use this method. * If the slice is known, do not use this method.
* coreNodeName is the same as replicaName * coreNodeName is the same as replicaName
*/ */
public Replica getReplica(final String collection, final String coreNodeName) { public Replica getReplica(final String collection, final String coreNodeName, boolean cachedOnly) {
return getReplica(collectionStates.get(collection), coreNodeName); DocCollection coll = stateReader.getCollection(collection,cachedOnly);
return coll == null? null: coll.getReplica(coreNodeName);
} }
/** /**
@ -153,6 +136,35 @@ public class ClusterState implements JSONWriter.Writable {
return coll.getActiveSlices(); return coll.getActiveSlices();
} }
/**
* Get the {@code DocCollection} object if available. This method will
* never hit ZooKeeper and attempt to fetch collection from locally available
* state only.
*
* @param collection the name of the collection
* @return the {@link org.apache.solr.common.cloud.DocCollection} or null if not found
*/
private DocCollection getCachedCollection(String collection) {
DocCollection c = collectionStates.get(collection);
if (c != null) return c;
if (!stateReader.getAllCollections().contains(collection)) return null;
return stateReader.getCollection(collection, true); // return from cache
}
/** expert internal use only
* Gets the replica from caches by the core name (assuming the slice is unknown) or null if replica is not found.
* If the slice is known, do not use this method.
* coreNodeName is the same as replicaName
*/
public Replica getCachedReplica(String collectionName, String coreNodeName) {
DocCollection c = getCachedCollection(collectionName);
if (c == null) return null;
for (Slice slice : c.getSlices()) {
Replica replica = slice.getReplica(coreNodeName);
if (replica != null) return replica;
}
return null;
}
/** /**
* Get the named DocCollection object, or throw an exception if it doesn't exist. * Get the named DocCollection object, or throw an exception if it doesn't exist.
@ -163,23 +175,26 @@ public class ClusterState implements JSONWriter.Writable {
return coll; return coll;
} }
public DocCollection getCollectionOrNull(String coll) { public DocCollection getCollectionOrNull(String coll) {
return collectionStates.get(coll); DocCollection c = collectionStates.get(coll);
if (c != null) return c;
if (!stateReader.getAllCollections().contains(coll)) return null;
return stateReader.getCollection(coll);
} }
/** /**
* Get collection names. * Get collection names.
*/ */
public Set<String> getCollections() { public Set<String> getCollections() {
return collectionStates.keySet(); return stateReader.getAllCollections();
} }
/** /**
* For internal use only
* @return Map&lt;collectionName, Map&lt;sliceName,Slice&gt;&gt; * @return Map&lt;collectionName, Map&lt;sliceName,Slice&gt;&gt;
*/ */
public Map<String, DocCollection> getCollectionStates() { Map<String, DocCollection> getCollectionStates() {
return Collections.unmodifiableMap(collectionStates); return collectionStates;
} }
/** /**
@ -238,7 +253,7 @@ public class ClusterState implements JSONWriter.Writable {
Stat stat = new Stat(); Stat stat = new Stat();
byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE, byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE,
null, stat, true); null, stat, true);
return load(stat.getVersion(), state, liveNodes); return load(stat.getVersion(), state, liveNodes, stateReader, ZkStateReader.CLUSTER_STATE);
} }
@ -250,25 +265,34 @@ public class ClusterState implements JSONWriter.Writable {
* @param version zk version of the clusterstate.json file (bytes) * @param version zk version of the clusterstate.json file (bytes)
* @param bytes clusterstate.json as a byte array * @param bytes clusterstate.json as a byte array
* @param liveNodes list of live nodes * @param liveNodes list of live nodes
* @param stateReader The ZkStateReader for this clusterstate
* @param znode the znode from which this data is read from
* @return the ClusterState * @return the ClusterState
*/ */
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) { public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes, ZkStateReader stateReader, String znode) {
// System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes))); // System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes)));
if (bytes == null || bytes.length == 0) { if (bytes == null || bytes.length == 0) {
return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap()); return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap(),stateReader);
} }
Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes); Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
Map<String,DocCollection> collections = new LinkedHashMap<>(stateMap.size()); Map<String,DocCollection> collections = new LinkedHashMap<>(stateMap.size());
for (Entry<String, Object> entry : stateMap.entrySet()) { for (Entry<String, Object> entry : stateMap.entrySet()) {
String collectionName = entry.getKey(); String collectionName = entry.getKey();
DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version); DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version, znode);
collections.put(collectionName, coll); collections.put(collectionName, coll);
} }
// System.out.println("######## ClusterState.load result:" + collections); // System.out.println("######## ClusterState.load result:" + collections);
return new ClusterState( version, liveNodes, collections); return new ClusterState( version, liveNodes, collections,stateReader);
} }
/**
* @deprecated use {@link #load(Integer, byte[], Set, ZkStateReader, String)}
*/
@Deprecated
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes){
return load(version == null ? -1: version, bytes, liveNodes,null,null);
}
public static Aliases load(byte[] bytes) { public static Aliases load(byte[] bytes) {
if (bytes == null || bytes.length == 0) { if (bytes == null || bytes.length == 0) {
@ -279,7 +303,7 @@ public class ClusterState implements JSONWriter.Writable {
return new Aliases(aliasMap); return new Aliases(aliasMap);
} }
private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version) { private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version, String znode) {
Map<String,Object> props; Map<String,Object> props;
Map<String,Slice> slices; Map<String,Slice> slices;
@ -306,7 +330,7 @@ public class ClusterState implements JSONWriter.Writable {
router = DocRouter.getDocRouter((String) routerProps.get("name")); router = DocRouter.getDocRouter((String) routerProps.get("name"));
} }
return new DocCollection(name, slices, props, router, version); return new DocCollection(name, slices, props, router, version,znode);
} }
private static Map<String,Slice> makeSlices(Map<String,Object> genericSlices) { private static Map<String,Slice> makeSlices(Map<String,Object> genericSlices) {
@ -334,7 +358,7 @@ public class ClusterState implements JSONWriter.Writable {
* *
* @return null if ClusterState was created for publication, not consumption * @return null if ClusterState was created for publication, not consumption
*/ */
public Integer getZkClusterStateVersion() { public Integer getZnodeVersion() {
return zkClusterStateVersion; return zkClusterStateVersion;
} }
@ -364,6 +388,9 @@ public class ClusterState implements JSONWriter.Writable {
} }
public ZkStateReader getStateReader(){
return stateReader;
}
/** /**
* Internal API used only by ZkStateReader * Internal API used only by ZkStateReader
@ -372,9 +399,5 @@ public class ClusterState implements JSONWriter.Writable {
this.liveNodes = liveNodes; this.liveNodes = liveNodes;
} }
public DocCollection getCommonCollection(String name){
return collectionStates.get(name);
}
} }

View File

@ -21,7 +21,6 @@ import org.noggit.JSONUtil;
import org.noggit.JSONWriter; import org.noggit.JSONWriter;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -33,15 +32,17 @@ import java.util.Map;
public class DocCollection extends ZkNodeProps { public class DocCollection extends ZkNodeProps {
public static final String DOC_ROUTER = "router"; public static final String DOC_ROUTER = "router";
public static final String SHARDS = "shards"; public static final String SHARDS = "shards";
private int version; public static final String STATE_FORMAT = "stateFormat";
private int znodeVersion;
private final String name; private final String name;
private final Map<String, Slice> slices; private final Map<String, Slice> slices;
private final Map<String, Slice> activeSlices; private final Map<String, Slice> activeSlices;
private final DocRouter router; private final DocRouter router;
private final String znode;
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) { public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
this(name, slices, props, router, -1); this(name, slices, props, router, -1, ZkStateReader.CLUSTER_STATE);
} }
/** /**
@ -49,9 +50,9 @@ public class DocCollection extends ZkNodeProps {
* @param slices The logical shards of the collection. This is used directly and a copy is not made. * @param slices The logical shards of the collection. This is used directly and a copy is not made.
* @param props The properties of the slice. This is used directly and a copy is not made. * @param props The properties of the slice. This is used directly and a copy is not made.
*/ */
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion) { public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, String znode) {
super( props==null ? props = new HashMap<String,Object>() : props); super( props==null ? props = new HashMap<String,Object>() : props);
this.version = zkVersion; this.znodeVersion = zkVersion;
this.name = name; this.name = name;
this.slices = slices; this.slices = slices;
@ -65,10 +66,14 @@ public class DocCollection extends ZkNodeProps {
this.activeSlices.put(slice.getKey(), slice.getValue()); this.activeSlices.put(slice.getKey(), slice.getValue());
} }
this.router = router; this.router = router;
this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
assert name != null && slices != null; assert name != null && slices != null;
} }
public DocCollection copyWith(Map<String, Slice> slices){
return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode);
}
/** /**
* Return collection name. * Return collection name.
@ -110,9 +115,16 @@ public class DocCollection extends ZkNodeProps {
return activeSlices; return activeSlices;
} }
public int getVersion(){ public int getZNodeVersion(){
return version; return znodeVersion;
}
public int getStateFormat(){
return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1:2;
}
public String getZNode(){
return znode;
} }
@ -132,4 +144,12 @@ public class DocCollection extends ZkNodeProps {
all.put(SHARDS, slices); all.put(SHARDS, slices);
jsonWriter.write(all); jsonWriter.write(all);
} }
public Replica getReplica(String coreNodeName) {
for (Slice slice : slices.values()) {
Replica replica = slice.getReplica(coreNodeName);
if (replica != null) return replica;
}
return null;
}
} }

View File

@ -564,6 +564,7 @@ public class SolrZkClient {
} }
public void close() { public void close() {
// log.warn("closed inst :"+inst, new Exception("leakdebug"));
if (isClosed) return; // it's okay if we over close - same as solrcore if (isClosed) return; // it's okay if we over close - same as solrcore
isClosed = true; isClosed = true;
try { try {

View File

@ -98,10 +98,16 @@ public class ZkStateReader {
public static final String LEADER_ELECT_ZKNODE = "/leader_elect"; public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
public static final String SHARD_LEADERS_ZKNODE = "leaders"; public static final String SHARD_LEADERS_ZKNODE = "leaders";
private final Set<String> watchedCollections = new HashSet<String>();
/**These are collections which are actively watched by this instance .
*
*/
private Map<String , DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
private Set<String> allCollections = Collections.emptySet();
// //
// convenience methods... should these go somewhere else? // convenience methods... should these go somewhere else?
// //
@ -162,7 +168,8 @@ public class ZkStateReader {
log.info("path={} {}={} specified config exists in ZooKeeper", log.info("path={} {}={} specified config exists in ZooKeeper",
new Object[] {path, CONFIGNAME_PROP, configName}); new Object[] {path, CONFIGNAME_PROP, configName});
} }
} else {
throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path);
} }
} }
catch (KeeperException e) { catch (KeeperException e) {
@ -251,22 +258,21 @@ public class ZkStateReader {
return aliases; return aliases;
} }
/*public Boolean checkValid(String coll, int version){ public Boolean checkValid(String coll, int version){
DocCollection collection = clusterState.getCollectionOrNull(coll); DocCollection collection = clusterState.getCollectionOrNull(coll);
if(collection ==null) return null; if(collection ==null) return null;
if(collection.getVersion() < version){ if(collection.getZNodeVersion() < version){
log.info("server older than client {}<{}",collection.getVersion(),version); log.info("server older than client {}<{}",collection.getZNodeVersion(),version);
DocCollection nu = getExternCollectionFresh(this, coll); DocCollection nu = getCollectionLive(this, coll);
if(nu.getVersion()> collection.getVersion()){ if(nu.getZNodeVersion()> collection.getZNodeVersion()){
updateExternCollection(nu); updateWatchedCollection(nu);
collection = nu; collection = nu;
} }
} }
if(collection.getVersion() == version) return Boolean.TRUE; if(collection.getZNodeVersion() == version) return Boolean.TRUE;
log.info("wrong version from client {}!={} ",version, collection.getVersion()); log.debug("wrong version from client {}!={} ",version, collection.getZNodeVersion());
return Boolean.FALSE; return Boolean.FALSE;
}
}*/
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException, public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
InterruptedException { InterruptedException {
@ -299,10 +305,11 @@ public class ZkStateReader {
byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat , byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat ,
true); true);
Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes(); Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes();
ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln); ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln,ZkStateReader.this, null);
// update volatile // update volatile
ZkStateReader.this.clusterState = clusterState; ZkStateReader.this.clusterState = clusterState;
updateCollectionNames();
// HashSet<String> all = new HashSet<>(colls);; // HashSet<String> all = new HashSet<>(colls);;
// all.addAll(clusterState.getAllInternalCollections()); // all.addAll(clusterState.getAllInternalCollections());
// all.remove(null); // all.remove(null);
@ -377,6 +384,7 @@ public class ZkStateReader {
liveNodeSet.addAll(liveNodes); liveNodeSet.addAll(liveNodes);
ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this); ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this);
this.clusterState = clusterState; this.clusterState = clusterState;
updateCollectionNames();
zkClient.exists(ALIASES, zkClient.exists(ALIASES,
new Watcher() { new Watcher() {
@ -422,6 +430,40 @@ public class ZkStateReader {
}, true); }, true);
} }
updateAliases(); updateAliases();
//on reconnect of SolrZkClient re-add watchers for the watched external collections
synchronized (this) {
for (String watchedCollection : watchedCollections) {
addZkWatch(watchedCollection);
}
}
}
public void updateCollectionNames() throws KeeperException, InterruptedException {
Set<String> colls = getExternColls();
colls.addAll(clusterState.getCollectionStates().keySet());
allCollections = Collections.unmodifiableSet(colls);
}
private Set<String> getExternColls() throws KeeperException, InterruptedException {
List<String> children = null;
try {
children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
} catch (KeeperException.NoNodeException e) {
log.warn("Error fetching collection names");
return new HashSet<>();
}
if (children == null || children.isEmpty()) return new HashSet<>();
HashSet<String> result = new HashSet<>(children.size());
for (String c : children) {
try {
if (zkClient.exists(getCollectionPath(c), true)) result.add(c);
} catch (Exception e) {
log.warn("Error reading collections nodes", e);
}
}
return result;
} }
@ -440,7 +482,7 @@ public class ZkStateReader {
liveNodesSet.addAll(liveNodes); liveNodesSet.addAll(liveNodes);
if (!onlyLiveNodes) { if (!onlyLiveNodes) {
log.info("Updating cloud state from ZooKeeper... "); log.debug("Updating cloud state from ZooKeeper... ");
clusterState = ClusterState.load(zkClient, liveNodesSet,this); clusterState = ClusterState.load(zkClient, liveNodesSet,this);
} else { } else {
@ -449,6 +491,7 @@ public class ZkStateReader {
clusterState.setLiveNodes(liveNodesSet); clusterState.setLiveNodes(liveNodesSet);
} }
this.clusterState = clusterState; this.clusterState = clusterState;
updateCollectionNames();
} }
} else { } else {
@ -507,9 +550,13 @@ public class ZkStateReader {
} }
}, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS); }, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
} }
synchronized (this) {
for (String watchedCollection : watchedCollections) {
watchedCollectionStates.put(watchedCollection, getCollectionLive(this, watchedCollection));
}
}
} }
/** /**
* @return information about the cluster from ZooKeeper * @return information about the cluster from ZooKeeper
*/ */
@ -632,6 +679,9 @@ public class ZkStateReader {
public SolrZkClient getZkClient() { public SolrZkClient getZkClient() {
return zkClient; return zkClient;
} }
public Set<String> getAllCollections(){
return allCollections;
}
public void updateAliases() throws KeeperException, InterruptedException { public void updateAliases() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData(ALIASES, null, null, true); byte[] data = zkClient.getData(ALIASES, null, null, true);
@ -678,4 +728,167 @@ public class ZkStateReader {
} }
} }
public void updateWatchedCollection(DocCollection c) {
if(watchedCollections.contains(c.getName())){
watchedCollectionStates.put(c.getName(), c);
log.info("Updated DocCollection "+c.getName()+" to: ");
}
}
/**
* <b>Advance usage</b>
* This method can be used to fetch a collection object and control whether it hits
* the cache only or if information can be looked up from ZooKeeper.
*
* @param coll the collection name
* @param cachedCopyOnly whether to fetch data from cache only or if hitting Zookeeper is acceptable
* @return the {@link org.apache.solr.common.cloud.DocCollection}
*/
public DocCollection getCollection(String coll, boolean cachedCopyOnly) {
if(clusterState.getCollectionStates().get(coll) != null) {
//this collection resides in clusterstate.json. So it's always up-to-date
return clusterState.getCollectionStates().get(coll);
}
if (watchedCollections.contains(coll) || cachedCopyOnly) {
DocCollection c = watchedCollectionStates.get(coll);
if (c != null || cachedCopyOnly) return c;
}
return getCollectionLive(this, coll);
}
private Map ephemeralCollectionData;
/**
* this is only set by Overseer not to be set by others and only set inside the Overseer node. If Overseer has
unfinished external collections which are yet to be persisted to ZK
this map is populated and this class can use that information
@param map The map reference
*/
public void setEphemeralCollectionData(Map map){
ephemeralCollectionData = map;
}
public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
String collectionPath = getCollectionPath(coll);
if(zkStateReader.ephemeralCollectionData !=null ){
ClusterState cs = (ClusterState) zkStateReader.ephemeralCollectionData.get(collectionPath);
if(cs !=null) {
return cs.getCollectionStates().get(coll);
}
}
try {
if (!zkStateReader.getZkClient().exists(collectionPath, true)) return null;
Stat stat = new Stat();
byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true);
ClusterState state = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(), zkStateReader, collectionPath);
return state.getCollectionStates().get(coll);
} catch (KeeperException.NoNodeException e) {
log.warn("No node available : " + collectionPath, e);
return null;
} catch (KeeperException e) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e);
}
}
public DocCollection getCollection(String coll) {
return getCollection(coll, false);
}
public static String getCollectionPath(String coll) {
return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
}
public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
synchronized (this){
if(watchedCollections.contains(coll)) return;
else {
watchedCollections.add(coll);
}
addZkWatch(coll);
}
}
private void addZkWatch(final String coll) throws KeeperException, InterruptedException {
log.info("addZkWatch {}", coll);
final String fullpath = getCollectionPath(coll);
synchronized (getUpdateLock()){
cmdExecutor.ensureExists(fullpath, zkClient);
log.info("Updating collection state at {} from ZooKeeper... ",fullpath);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
// session events are not change events,
// and do not remove the watcher
if (EventType.None.equals(event.getType())) {
return;
}
log.info("A cluster state change: {}, has occurred - updating... ", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
try {
// delayed approach
// ZkStateReader.this.updateClusterState(false, false);
synchronized (ZkStateReader.this.getUpdateLock()) {
if(!watchedCollections.contains(coll)) {
log.info("Unwatched collection {}",coll);
return;
}
// remake watch
final Watcher thisWatch = this;
Stat stat = new Stat();
byte[] data = zkClient.getData(fullpath, thisWatch, stat, true);
if(data == null || data.length ==0){
log.warn("No value set for collection state : {}", coll);
return;
}
ClusterState clusterState = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(),ZkStateReader.this,fullpath);
// update volatile
DocCollection newState = clusterState.getCollectionStates().get(coll);
watchedCollectionStates.put(coll, newState);
log.info("Updating data for {} to ver {} ", coll , newState.getZNodeVersion());
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("Unwatched collection :"+coll , e);
throw new ZooKeeperException(ErrorCode.SERVER_ERROR,
"", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Unwatched collection :"+coll , e);
return;
}
}
};
zkClient.exists(fullpath, watcher, true);
}
watchedCollectionStates.put(coll, getCollectionLive(this, coll));
}
/**This is not a public API. Only used by ZkController */
public void removeZKWatch(final String coll){
synchronized (this){
watchedCollections.remove(coll);
}
}
} }

View File

@ -20,7 +20,6 @@ package org.apache.solr.client.solrj.impl;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -42,7 +41,6 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase; import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase; import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
@ -51,7 +49,6 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
@ -62,7 +59,6 @@ import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -125,6 +121,7 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase {
@Override @Override
public void doTest() throws Exception { public void doTest() throws Exception {
allTests(); allTests();
stateVersionParamTest();
} }
private void allTests() throws Exception { private void allTests() throws Exception {
@ -346,7 +343,77 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase {
SolrInputDocument doc = getDoc(fields); SolrInputDocument doc = getDoc(fields);
indexDoc(doc); indexDoc(doc);
} }
private void stateVersionParamTest() throws Exception {
CloudSolrServer client = createCloudClient(null);
try {
String collectionName = "checkStateVerCol";
createCollection(collectionName, client, 2, 2);
waitForRecoveriesToFinish(collectionName, false);
DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName);
Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
HttpSolrServer httpSolrServer = new HttpSolrServer(r.getStr(ZkStateReader.BASE_URL_PROP) + "/"+collectionName);
SolrQuery q = new SolrQuery().setQuery("*:*");
log.info("should work query, result {}", httpSolrServer.query(q));
//no problem
q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
log.info("2nd query , result {}", httpSolrServer.query(q));
//no error yet good
q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+ (coll.getZNodeVersion() -1)); //an older version expect error
HttpSolrServer.RemoteSolrException sse = null;
try {
httpSolrServer.query(q);
log.info("expected query error");
} catch (HttpSolrServer.RemoteSolrException e) {
sse = e;
}
httpSolrServer.shutdown();
assertNotNull(sse);
assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
//now send the request to another node that does n ot serve the collection
Set<String> allNodesOfColl = new HashSet<>();
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
}
}
String theNode = null;
for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) {
String n = client.getZkStateReader().getBaseUrlForNodeName(s);
if(!allNodesOfColl.contains(s)){
theNode = n;
break;
}
}
log.info("thenode which does not serve this collection{} ",theNode);
assertNotNull(theNode);
httpSolrServer = new HttpSolrServer(theNode + "/"+collectionName);
q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
try {
httpSolrServer.query(q);
log.info("error was expected");
} catch (HttpSolrServer.RemoteSolrException e) {
sse = e;
}
httpSolrServer.shutdown();
assertNotNull(sse);
assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
} finally {
client.shutdown();
}
}
public void testShutdown() throws MalformedURLException { public void testShutdown() throws MalformedURLException {
CloudSolrServer server = new CloudSolrServer("[ff01::114]:33332"); CloudSolrServer server = new CloudSolrServer("[ff01::114]:33332");
try { try {

View File

@ -339,6 +339,19 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
return createJettys(numJettys, false); return createJettys(numJettys, false);
} }
protected int defaultStateFormat = 1 + random().nextInt(2);
protected int getStateFormat() {
String stateFormat = System.getProperty("tests.solr.stateFormat", null);
if (stateFormat != null) {
if ("2".equals(stateFormat)) {
return defaultStateFormat = 2;
} else if ("1".equals(stateFormat)) {
return defaultStateFormat = 1;
}
}
return defaultStateFormat; // random
}
/** /**
* @param checkCreatedVsState * @param checkCreatedVsState
@ -351,6 +364,17 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
List<SolrServer> clients = new ArrayList<>(); List<SolrServer> clients = new ArrayList<>();
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
if(getStateFormat() == 2) {
log.info("Creating collection1 with stateFormat=2");
SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(ZkNodeProps.makeMap(
Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.CREATECOLLECTION,
"name", DEFAULT_COLLECTION,
"numShards", String.valueOf(sliceCount),
DocCollection.STATE_FORMAT, getStateFormat())));
zkClient.close();
}
for (int i = 1; i <= numJettys; i++) { for (int i = 1; i <= numJettys; i++) {
if (sb.length() > 0) sb.append(','); if (sb.length() > 0) sb.append(',');
int cnt = this.jettyIntCntr.incrementAndGet(); int cnt = this.jettyIntCntr.incrementAndGet();
@ -1490,6 +1514,10 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
collectionInfos.put(collectionName, list); collectionInfos.put(collectionName, list);
} }
params.set("name", collectionName); params.set("name", collectionName);
if (getStateFormat() == 2) {
log.info("Creating collection with stateFormat=2: " + collectionName);
params.set(DocCollection.STATE_FORMAT, "2");
}
SolrRequest request = new QueryRequest(params); SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections"); request.setPath("/admin/collections");