split clusterstate.json SOLR-5473, SOLR-5474, SOLR-5810

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1624556 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2014-09-12 14:11:17 +00:00
parent d2447ca5ff
commit 7888a43541
14 changed files with 1033 additions and 175 deletions

View File

@ -144,6 +144,13 @@ New Features
* SOLR-5098: Schema API: Add REST support for adding field types to the schema.
(Timothy Potter)
* SOLR-5473 : Split clusterstate.json per collection and watch states selectively
(Noble Paul, Mark Miller, shalin, Jessica Cheng Mallet, Timothy Potter, Ashum Gupta)
* SOLR-5474 : Support for SOLR-5473 in SolrJ (Timothy Potter, Noble Paul, Mark Miller)
* SOLR-5810 : Support for SOLR-5473 in solr admin UI (Timothy Potter, Noble Paul)
Bug Fixes
----------------------

View File

@ -108,6 +108,8 @@ public class Overseer implements Closeable {
private Map clusterProps;
private boolean isClosed = false;
private final Map<String, Object> updateNodes = new LinkedHashMap<>();
private boolean isClusterStateModified = false;
public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
@ -261,6 +263,7 @@ public class Overseer implements Closeable {
stateUpdateQueue.poll();
if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break;
if(!updateNodes.isEmpty()) break;
// if an event comes in the next 100ms batch it together
head = stateUpdateQueue.peek(100);
}
@ -300,8 +303,30 @@ public class Overseer implements Closeable {
TimerContext timerContext = stats.time("update_state");
boolean success = false;
try {
zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true);
lastUpdatedTime = System.nanoTime();
if (!updateNodes.isEmpty()) {
for (Entry<String,Object> e : updateNodes.entrySet()) {
if (e.getValue() == null) {
if (zkClient.exists(e.getKey(), true)) zkClient.delete(e.getKey(), 0, true);
} else {
byte[] data = ZkStateReader.toJSON(e.getValue());
if (zkClient.exists(e.getKey(), true)) {
log.info("going to update_collection {}", e.getKey());
zkClient.setData(e.getKey(), data, true);
} else {
log.info("going to create_collection {}", e.getKey(), new String(data));
zkClient.create(e.getKey(), data, CreateMode.PERSISTENT, true);
}
}
}
updateNodes.clear();
}
if (isClusterStateModified) {
lastUpdatedTime = System.nanoTime();
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(clusterState), true);
isClusterStateModified = false;
}
success = true;
} finally {
timerContext.stop();
@ -703,7 +728,7 @@ public class Overseer implements Closeable {
}
Slice slice = clusterState.getSlice(collection, sliceName);
Map<String,Object> replicaProps = new LinkedHashMap<>();
replicaProps.putAll(message.getProperties());
@ -721,7 +746,7 @@ public class Overseer implements Closeable {
replicaProps.remove(ZkStateReader.SHARD_ID_PROP);
replicaProps.remove(ZkStateReader.COLLECTION_PROP);
replicaProps.remove(QUEUE_OPERATION);
// remove any props with null values
Set<Entry<String,Object>> entrySet = replicaProps.entrySet();
List<String> removeKeys = new ArrayList<>();
@ -869,10 +894,22 @@ public class Overseer implements Closeable {
}
collectionProps.put(DocCollection.DOC_ROUTER, routerSpec);
if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true");
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
return newState(state, singletonMap(newCollection.getName(), newCollection));
if (message.getStr("fromApi") == null) {
collectionProps.put("autoCreated", "true");
}
String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null
: ZkStateReader.getCollectionPath(collectionName);
DocCollection newCollection = new DocCollection(collectionName,
newSlices, collectionProps, router, -1, znode);
isClusterStateModified = true;
log.info("state version {} {}", collectionName, newCollection.getStateFormat());
return newState(state, singletonMap(newCollection.getName(), newCollection));
}
/*
* Return an already assigned id or null if not assigned
@ -909,30 +946,28 @@ public class Overseer implements Closeable {
}
return null;
}
private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
// System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
// System.out.println("Updating slice:" + slice);
DocCollection newCollection = null;
DocCollection coll = state.getCollectionOrNull(collectionName) ;
Map<String,Slice> slices;
Map<String,Object> props;
DocRouter router;
if (coll == null) {
// when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
// without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
slices = new HashMap<>(1);
props = new HashMap<>(1);
slices = new LinkedHashMap<>(1);
slices.put(slice.getName(), slice);
Map<String,Object> props = new HashMap<>(1);
props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME));
router = new ImplicitDocRouter();
newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
} else {
props = coll.getProperties();
router = coll.getRouter();
slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy
slices.put(slice.getName(), slice);
newCollection = coll.copyWithSlices(slices);
}
slices.put(slice.getName(), slice);
DocCollection newCollection = new DocCollection(collectionName, slices, props, router);
// System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
@ -991,27 +1026,52 @@ public class Overseer implements Closeable {
}
DocCollection newCollection = new DocCollection(coll.getName(), slices, coll.getProperties(), coll.getRouter());
DocCollection newCollection = coll.copyWithSlices(slices);
return newState(state, singletonMap(collectionName, newCollection));
}
private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
return state.copyWith(colls);
}
/*
* Remove collection from cloudstate
*/
private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
final String collection = message.getStr("name");
if (!checkKeyExistence(message, "name")) return clusterState;
DocCollection coll = clusterState.getCollectionOrNull(collection);
if(coll !=null) {
return clusterState.copyWith(singletonMap(collection,(DocCollection)null));
private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
for (Entry<String, DocCollection> e : colls.entrySet()) {
DocCollection c = e.getValue();
if (c == null) {
isClusterStateModified = true;
state = state.copyWith(singletonMap(e.getKey(), (DocCollection) null));
updateNodes.put(ZkStateReader.getCollectionPath(e.getKey()) ,null);
continue;
}
return clusterState;
}
if (c.getStateFormat() > 1) {
updateNodes.put(ZkStateReader.getCollectionPath(c.getName()),
new ClusterState(-1, Collections.<String>emptySet(), singletonMap(c.getName(), c)));
} else {
isClusterStateModified = true;
}
state = state.copyWith(singletonMap(e.getKey(), c));
}
return state;
}
/*
* Remove collection from cloudstate
*/
private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
final String collection = message.getStr("name");
if (!checkKeyExistence(message, "name")) return clusterState;
DocCollection coll = clusterState.getCollectionOrNull(collection);
if(coll == null) return clusterState;
isClusterStateModified = true;
if (coll.getStateFormat() > 1) {
try {
log.info("Deleting state for collection : {}", collection);
zkClient.delete(ZkStateReader.getCollectionPath(collection), -1, true);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to remove collection state :" + collection);
}
}
return newState(clusterState, singletonMap(coll.getName(),(DocCollection) null));
}
/*
* Remove collection slice from cloudstate
*/
@ -1027,7 +1087,7 @@ public class Overseer implements Closeable {
Map<String, Slice> newSlices = new LinkedHashMap<>(coll.getSlicesMap());
newSlices.remove(sliceId);
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
DocCollection newCollection = coll.copyWithSlices(newSlices);
return newState(clusterState, singletonMap(collection,newCollection));
}
@ -1039,8 +1099,6 @@ public class Overseer implements Closeable {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return clusterState;
// final Map<String, DocCollection> newCollections = new LinkedHashMap<>(clusterState.getCollectionStates()); // shallow copy
// DocCollection coll = newCollections.get(collection);
DocCollection coll = clusterState.getCollectionOrNull(collection) ;
if (coll == null) {
// TODO: log/error that we didn't find it?
@ -1078,7 +1136,7 @@ public class Overseer implements Closeable {
newSlices.put(slice.getName(), slice);
}
}
if (lastSlice) {
// remove all empty pre allocated slices
for (Slice slice : coll.getSlices()) {
@ -1095,7 +1153,7 @@ public class Overseer implements Closeable {
// but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
// ZkController out of the Overseer.
try {
zkClient.clean("/collections/" + collection);
zkClient.delete("/collections/" + collection, -1, true);
} catch (InterruptedException e) {
SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
Thread.currentThread().interrupt();
@ -1105,8 +1163,8 @@ public class Overseer implements Closeable {
return newState(clusterState,singletonMap(collection, (DocCollection) null));
} else {
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
return newState(clusterState,singletonMap(collection,newCollection));
DocCollection newCollection = coll.copyWithSlices(newSlices);
return newState(clusterState,singletonMap(collection,newCollection));
}
}

View File

@ -698,7 +698,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
// convert cluster state into a map of writable types
byte[] bytes = ZkStateReader.toJSON(clusterState);
Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
Map<String, Object> stateMap = (Map<String,Object>) ZkStateReader.fromJSON(bytes);
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
NamedList<Object> collectionProps = new SimpleOrderedMap<Object>();
@ -706,7 +706,13 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
Set<String> collections = clusterState.getCollections();
for (String name : collections) {
Map<String, Object> collectionStatus = null;
collectionStatus = getCollectionStatus((Map<String, Object>) stateMap.get(name), name, shard);
if (clusterState.getCollection(name).getStateFormat() > 1) {
bytes = ZkStateReader.toJSON(clusterState.getCollection(name));
Map<String, Object> docCollection = (Map<String,Object>) ZkStateReader.fromJSON(bytes);
collectionStatus = getCollectionStatus(docCollection, name, shard);
} else {
collectionStatus = getCollectionStatus((Map<String,Object>) stateMap.get(name), name, shard);
}
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
collectionStatus.put("aliases", collectionVsAliases.get(name));
}
@ -715,8 +721,12 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
} else {
String routeKey = message.getStr(ShardParams._ROUTE_);
Map<String, Object> docCollection = null;
docCollection = (Map<String, Object>) stateMap.get(collection);
if (clusterState.getCollection(collection).getStateFormat() > 1) {
bytes = ZkStateReader.toJSON(clusterState.getCollection(collection));
docCollection = (Map<String,Object>) ZkStateReader.fromJSON(bytes);
} else {
docCollection = (Map<String,Object>) stateMap.get(collection);
}
if (routeKey == null) {
Map<String, Object> collectionStatus = getCollectionStatus(docCollection, collection, shard);
if (collectionVsAliases.containsKey(collection) && !collectionVsAliases.get(collection).isEmpty()) {
@ -2409,11 +2419,15 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
}
if(configName!= null){
log.info("creating collections conf node {} ",ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll);
zkStateReader.getZkClient().makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll,
ZkStateReader.toJSON(ZkNodeProps.makeMap(ZkController.CONFIGNAME_PROP,configName)),true );
if (configName != null) {
String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
log.info("creating collections conf node {} ", collDir);
byte[] data = ZkStateReader.toJSON(ZkNodeProps.makeMap(ZkController.CONFIGNAME_PROP, configName));
if (zkStateReader.getZkClient().exists(collDir, true)) {
zkStateReader.getZkClient().setData(collDir, data, true);
} else {
zkStateReader.getZkClient().makePath(collDir, data, true);
}
} else {
if(isLegacyCloud){
log.warn("Could not obtain config name");

View File

@ -1165,6 +1165,19 @@ public final class ZkController {
}
CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
boolean removeWatch = true;
// if there is no SolrCore which is a member of this collection, remove the watch
for (SolrCore solrCore : cc.getCores()) {
CloudDescriptor cloudDesc = solrCore.getCoreDescriptor()
.getCloudDescriptor();
if (cloudDesc != null
&& cloudDescriptor.getCollectionName().equals(
cloudDesc.getCollectionName())) {
removeWatch = false;
break;
}
}
if (removeWatch) zkStateReader.removeZKWatch(collection);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.DELETECORE, ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, getNodeName(),
@ -1466,6 +1479,11 @@ public final class ZkController {
}
publish(cd, ZkStateReader.DOWN, false, true);
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
if(collection !=null && collection.getStateFormat()>1 ){
log.info("Registering watch for external collection {}",cd.getCloudDescriptor().getCollectionName());
zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName());
}
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);

View File

@ -24,6 +24,7 @@ import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATESHARD;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
import static org.apache.solr.cloud.OverseerCollectionProcessor.DELETEREPLICA;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
@ -59,6 +60,7 @@ import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
@ -142,7 +144,7 @@ public class CollectionsHandler extends RequestHandlerBase {
if (action == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: " + a);
}
switch (action) {
case CREATE: {
this.handleCreateAction(req, rsp);
@ -468,15 +470,16 @@ public class CollectionsHandler extends RequestHandlerBase {
Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.CREATECOLLECTION,
"fromApi","true");
copyIfNotNull(req.getParams(), props,
"name",
ZkStateReader.REPLICATION_FACTOR,
copyIfNotNull(req.getParams(),props,
"name",
REPLICATION_FACTOR,
COLL_CONF,
NUM_SLICES,
MAX_SHARDS_PER_NODE,
CREATE_NODE_SET,
SHARDS_PROP,
ASYNC,
DocCollection.STATE_FORMAT,
AUTO_ADD_REPLICAS,
"router.");
@ -670,4 +673,9 @@ public class CollectionsHandler extends RequestHandlerBase {
public String getDescription() {
return "Manage SolrCloud Collections";
}
@Override
public String getSource() {
return "$URL: https://svn.apache.org/repos/asf/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionHandler.java $";
}
}

View File

@ -318,6 +318,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
String coreUrl = getRemotCoreUrl(cores, corename, origCorename);
// don't proxy for internal update requests
SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());
checkStateIsValid(cores, queryParams.get(CloudSolrServer.STATE_VERSION));
if (coreUrl != null
&& queryParams
.get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) {
@ -373,6 +374,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
if( "/select".equals( path ) || "/select/".equals( path ) ) {
solrReq = parser.parse( core, path, req );
checkStateIsValid(cores,solrReq.getParams().get(CloudSolrServer.STATE_VERSION));
String qt = solrReq.getParams().get( CommonParams.QT );
handler = core.getRequestHandler( qt );
if( handler == null ) {
@ -462,6 +464,22 @@ public class SolrDispatchFilter extends BaseSolrFilter {
chain.doFilter(request, response);
}
private void checkStateIsValid(CoreContainer cores, String stateVer) {
if (stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware()) {
// many have multiple collections separated by |
String[] pairs = StringUtils.split(stateVer, '|');
for (String pair : pairs) {
String[] pcs = StringUtils.split(pair, ':');
if (pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()) {
Boolean status = cores.getZkController().getZkStateReader().checkValid(pcs[0], Integer.parseInt(pcs[1]));
if (Boolean.TRUE != status) {
throw new SolrException(ErrorCode.INVALID_STATE, "STATE STALE: " + pair + "valid : " + status);
}
}
}
}
}
private void processAliases(SolrQueryRequest solrReq, Aliases aliases,
List<String> collectionsList) {

View File

@ -24,7 +24,6 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@ -35,9 +34,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.lucene.util.BytesRef;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import org.noggit.ObjectBuilder;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
@ -46,14 +42,15 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.FastWriter;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import org.noggit.ObjectBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Zookeeper Info
*
@ -90,7 +87,6 @@ public final class ZookeeperInfoServlet extends HttpServlet {
String path = params.get("path");
String addr = params.get("addr");
boolean all = "true".equals(params.get("all"));
if (addr != null && addr.length() == 0) {
addr = null;
@ -110,6 +106,7 @@ public final class ZookeeperInfoServlet extends HttpServlet {
ZKPrinter printer = new ZKPrinter(response, out, cores.getZkController(), addr);
printer.detail = detail;
printer.dump = dump;
printer.isTreeView = (params.get("wt") == null); // this is hacky but tree view requests don't come in with the wt set
try {
printer.print(path);
@ -140,6 +137,8 @@ public final class ZookeeperInfoServlet extends HttpServlet {
boolean detail = false;
boolean dump = false;
boolean isTreeView = false;
String addr; // the address passed to us
String keeperAddr; // the address we're connected to
@ -385,6 +384,47 @@ public final class ZookeeperInfoServlet extends HttpServlet {
dataStrErr = "data is not parsable as a utf8 String: " + e.toString();
}
}
// pull in external collections too
if (ZkStateReader.CLUSTER_STATE.equals(path) && !isTreeView) {
SortedMap<String,Object> collectionStates = null;
List<String> children = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, null, true);
java.util.Collections.sort(children);
for (String collection : children) {
String collStatePath = ZkStateReader.getCollectionPath(collection);
String childDataStr = null;
try {
byte[] childData = zkClient.getData(collStatePath, null, null, true);
if (childData != null) {
childDataStr = (new BytesRef(childData)).utf8ToString();
}
} catch (KeeperException.NoNodeException nne) {
// safe to ignore
} catch (Exception childErr) {
log.error("Failed to get "+collStatePath+" due to: "+childErr);
}
if (childDataStr != null) {
if (collectionStates == null) {
// initialize lazily as there may not be any external collections
collectionStates = new TreeMap<>();
// add the internal collections
if (dataStr != null)
collectionStates.putAll((Map<String,Object>)ObjectBuilder.fromJSON(dataStr));
}
// now add in the external collections
Map<String,Object> extColl = (Map<String,Object>)ObjectBuilder.fromJSON(childDataStr);
collectionStates.put(collection, extColl.get(collection));
}
}
if (collectionStates != null) {
CharArr out = new CharArr();
new JSONWriter(out, 2).write(collectionStates);
dataStr = out.toString();
}
}
json.writeString("znode");
json.writeNameSeparator();

View File

@ -0,0 +1,115 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
public class ExternalCollectionsTest extends AbstractFullDistribZkTestBase {
private CloudSolrServer client;
@BeforeClass
public static void beforeThisClass2() throws Exception {
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
System.setProperty("numShards", Integer.toString(sliceCount));
System.setProperty("solr.xml.persist", "true");
client = createCloudClient(null);
}
@After
public void tearDown() throws Exception {
super.tearDown();
client.shutdown();
}
protected String getSolrXml() {
return "solr-no-core.xml";
}
public ExternalCollectionsTest() {
fixShardCount = true;
sliceCount = 2;
shardCount = 4;
checkCreatedVsState = false;
}
@Override
public void doTest() throws Exception {
testZkNodeLocation();
}
@Override
protected int getStateFormat() {
return 2;
}
private void testZkNodeLocation() throws Exception{
String collectionName = "myExternColl";
createCollection(collectionName, client, 2, 2);
waitForRecoveriesToFinish(collectionName, false);
assertTrue("does not exist collection state externally",
cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
Stat stat = new Stat();
byte[] data = cloudClient.getZkStateReader().getZkClient().getData(ZkStateReader.getCollectionPath(collectionName), null, stat, true);
DocCollection c = ZkStateReader.getCollectionLive(cloudClient.getZkStateReader(), collectionName);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
assertEquals("The zkversion of the nodes must be same zkver:" + stat.getVersion() , stat.getVersion(),clusterState.getCollection(collectionName).getZNodeVersion() );
assertTrue("DocCllection#getStateFormat() must be > 1", cloudClient.getZkStateReader().getClusterState().getCollection(collectionName).getStateFormat() > 1);
// remove collection
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.DELETE.toString());
params.set("name", collectionName);
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
if (client == null) {
client = createCloudClient(null);
}
client.request(request);
checkForMissingCollection(collectionName);
assertFalse("collection state should not exist externally", cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
}
}

View File

@ -18,6 +18,8 @@ package org.apache.solr.client.solrj.impl;
*/
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -29,13 +31,16 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
@ -66,6 +71,8 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SolrJ client class to communicate with SolrCloud.
@ -78,6 +85,8 @@ import org.apache.zookeeper.KeeperException;
* with {@link #setIdField(String)}.
*/
public class CloudSolrServer extends SolrServer {
private static final Logger log = LoggerFactory.getLogger(CloudSolrServer.class);
private volatile ZkStateReader zkStateReader;
private String zkHost; // the zk server address
private int zkConnectTimeout = 10000;
@ -86,6 +95,8 @@ public class CloudSolrServer extends SolrServer {
private final LBHttpSolrServer lbServer;
private final boolean shutdownLBHttpSolrServer;
private HttpClient myClient;
//no of times collection state to be reloaded if stale state error is received
private static final int MAX_STALE_RETRIES = 5;
Random rand = new Random();
private final boolean updatesToLeaders;
@ -94,6 +105,7 @@ public class CloudSolrServer extends SolrServer {
.newCachedThreadPool(new SolrjNamedThreadFactory(
"CloudSolrServer ThreadPool"));
private String idField = "id";
public static final String STATE_VERSION = "_stateVer_";
private final Set<String> NON_ROUTABLE_PARAMS;
{
NON_ROUTABLE_PARAMS = new HashSet<>();
@ -111,6 +123,36 @@ public class CloudSolrServer extends SolrServer {
// NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
}
private volatile long timeToLive = 60* 1000L;
protected Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){
@Override
public ExpiringCachedDocCollection get(Object key) {
ExpiringCachedDocCollection val = super.get(key);
if(val == null) return null;
if(val.isExpired(timeToLive)) {
super.remove(key);
return null;
}
return val;
}
};
class ExpiringCachedDocCollection {
DocCollection cached;
long cachedAt;
ExpiringCachedDocCollection(DocCollection cached) {
this.cached = cached;
this.cachedAt = System.currentTimeMillis();
}
boolean isExpired(long timeToLive) {
return (System.currentTimeMillis() - cachedAt) > timeToLive;
}
}
/**
* Create a new client object that connects to Zookeeper and is always aware
@ -133,7 +175,15 @@ public class CloudSolrServer extends SolrServer {
* "zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181"
*/
public CloudSolrServer(String zkHost) {
this(zkHost, true);
this.zkHost = zkHost;
this.myClient = HttpClientUtil.createClient(null);
this.lbServer = new LBHttpSolrServer(myClient);
this.lbServer.setRequestWriter(new BinaryRequestWriter());
this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = true;
shutdownLBHttpSolrServer = true;
setupStateVerParamOnQueryString(lbServer);
}
/**
@ -151,6 +201,15 @@ public class CloudSolrServer extends SolrServer {
this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = updatesToLeaders;
shutdownLBHttpSolrServer = true;
setupStateVerParamOnQueryString(lbServer);
}
/**Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
* @param seconds ttl value in seconds
*/
public void setCollectionCacheTTl(int seconds){
assert seconds > 0;
timeToLive = seconds*1000L;
}
/**
@ -178,8 +237,24 @@ public class CloudSolrServer extends SolrServer {
this.lbServer = lbServer;
this.updatesToLeaders = updatesToLeaders;
shutdownLBHttpSolrServer = false;
setupStateVerParamOnQueryString(lbServer);
}
/**
* Used internally to setup the _stateVer_ param to be sent in the query string of requests
* coming from this instance.
*/
protected void setupStateVerParamOnQueryString(LBHttpSolrServer lbServer) {
// setup the stateVer param to be passed in the query string of every request
Set<String> queryStringParams = lbServer.getQueryParams();
if (queryStringParams == null) {
queryStringParams = new HashSet<String>(2);
lbServer.setQueryParams(queryStringParams);
}
queryStringParams.add(STATE_VERSION);
}
public ResponseParser getParser() {
return lbServer.getParser();
}
@ -317,7 +392,7 @@ public class CloudSolrServer extends SolrServer {
}
}
DocCollection col = clusterState.getCollection(collection);
DocCollection col = getDocCollection(clusterState, collection);
DocRouter router = col.getRouter();
@ -534,7 +609,146 @@ public class CloudSolrServer extends SolrServer {
}
@Override
public NamedList<Object> request(SolrRequest request)
public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
SolrParams reqParams = request.getParams();
String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection();
return requestWithRetryOnStaleState(request, 0, collection);
}
/**
* As this class doesn't watch external collections on the client side,
* there's a chance that the request will fail due to cached stale state,
* which means the state must be refreshed from ZK and retried.
*/
protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection)
throws SolrServerException, IOException {
connect(); // important to call this before you start working with the ZkStateReader
// build up a _stateVer_ param to pass to the server containing all of the
// external collection state versions involved in this request, which allows
// the server to notify us that our cached state for one or more of the external
// collections is stale and needs to be refreshed ... this code has no impact on internal collections
String stateVerParam = null;
List<DocCollection> requestedCollections = null;
if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests
Set<String> requestedCollectionNames = getCollectionList(getZkStateReader().getClusterState(), collection);
StringBuilder stateVerParamBuilder = null;
for (String requestedCollection : requestedCollectionNames) {
// track the version of state we're using on the client side using the _stateVer_ param
DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection);
int collVer = coll.getZNodeVersion();
if (coll.getStateFormat()>1) {
if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
requestedCollections.add(coll);
if (stateVerParamBuilder == null) {
stateVerParamBuilder = new StringBuilder();
} else {
stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
}
stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
}
}
if (stateVerParamBuilder != null) {
stateVerParam = stateVerParamBuilder.toString();
}
}
if (request.getParams() instanceof ModifiableSolrParams) {
ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
if (stateVerParam != null) {
params.set(STATE_VERSION, stateVerParam);
} else {
params.remove(STATE_VERSION);
}
} // else: ??? how to set this ???
NamedList<Object> resp = null;
try {
resp = sendRequest(request);
} catch (Exception exc) {
Throwable rootCause = SolrException.getRootCause(exc);
// don't do retry support for admin requests or if the request doesn't have a collection specified
if (collection == null || request.getPath().startsWith("/admin")) {
if (exc instanceof SolrServerException) {
throw (SolrServerException)exc;
} else if (exc instanceof IOException) {
throw (IOException)exc;
}else if (exc instanceof RuntimeException) {
throw (RuntimeException) exc;
}
else {
throw new SolrServerException(rootCause);
}
}
int errorCode = (rootCause instanceof SolrException) ?
((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
log.error("Request to collection {} failed due to ("+errorCode+
") {}, retry? "+retryCount, collection, rootCause.toString());
boolean wasCommError =
(rootCause instanceof ConnectException ||
rootCause instanceof ConnectTimeoutException ||
rootCause instanceof NoHttpResponseException ||
rootCause instanceof SocketException);
boolean stateWasStale = false;
if (retryCount < MAX_STALE_RETRIES &&
!requestedCollections.isEmpty() &&
SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
{
// cached state for one or more external collections was stale
// re-issue request using updated state
stateWasStale = true;
// just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
for (DocCollection ext : requestedCollections) {
collectionStateCache.remove(ext.getName());
}
}
// if we experienced a communication error, it's worth checking the state
// with ZK just to make sure the node we're trying to hit is still part of the collection
if (retryCount < MAX_STALE_RETRIES && !stateWasStale && !requestedCollections.isEmpty() && wasCommError) {
for (DocCollection ext : requestedCollections) {
DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName());
if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
// looks like we couldn't reach the server because the state was stale == retry
stateWasStale = true;
// we just pulled state from ZK, so update the cache so that the retry uses it
collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
}
}
}
requestedCollections.clear(); // done with this
// if the state was stale, then we retry the request once with new state pulled from Zk
if (stateWasStale) {
log.warn("Re-trying request to collection(s) "+collection+" after stale state error from server.");
resp = requestWithRetryOnStaleState(request, retryCount+1, collection);
} else {
if (exc instanceof SolrServerException) {
throw (SolrServerException)exc;
} else if (exc instanceof IOException) {
throw (IOException)exc;
} else {
throw new SolrServerException(rootCause);
}
}
}
return resp;
}
protected NamedList<Object> sendRequest(SolrRequest request)
throws SolrServerException, IOException {
connect();
@ -594,7 +808,7 @@ public class CloudSolrServer extends SolrServer {
// add it to the Map of slices.
Map<String,Slice> slices = new HashMap<>();
for (String collectionName : collectionsList) {
DocCollection col = clusterState.getCollection(collectionName);
DocCollection col = getDocCollection(clusterState, collectionName);
Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
ClientUtils.addSlices(slices, collectionName, routeSlices, true);
}
@ -654,14 +868,16 @@ public class CloudSolrServer extends SolrServer {
theUrlList = new ArrayList<>(urlList.size());
theUrlList.addAll(urlList);
}
if(theUrlList.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Not enough nodes to handle the request");
}
Collections.shuffle(theUrlList, rand);
if (sendToLeaders) {
ArrayList<String> theReplicas = new ArrayList<>(
replicasList.size());
theReplicas.addAll(replicasList);
Collections.shuffle(theReplicas, rand);
// System.out.println("leaders:" + theUrlList);
// System.out.println("replicas:" + theReplicas);
theUrlList.addAll(theReplicas);
}
@ -690,10 +906,13 @@ public class CloudSolrServer extends SolrServer {
collectionsList.addAll(aliasList);
continue;
}
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
DocCollection docCollection = getDocCollection(clusterState, collection);
if (docCollection == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
}
}
collectionsList.add(collectionName);
}
return collectionsList;
@ -730,6 +949,19 @@ public class CloudSolrServer extends SolrServer {
return updatesToLeaders;
}
protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException {
ExpiringCachedDocCollection cachedState = collectionStateCache != null ? collectionStateCache.get(collection) : null;
if (cachedState != null && cachedState.cached != null) {
return cachedState.cached;
}
DocCollection col = clusterState.getCollectionOrNull(collection);
if(col == null ) return null;
collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
return col;
}
/**
* Useful for determining the minimum achieved replication factor across
* all shards involved in processing an update request, typically useful

View File

@ -26,11 +26,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.noggit.JSONWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,9 +40,9 @@ import org.slf4j.LoggerFactory;
public class ClusterState implements JSONWriter.Writable {
private static Logger log = LoggerFactory.getLogger(ClusterState.class);
private Integer zkClusterStateVersion;
private Integer znodeVersion;
private final Map<String, DocCollection> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
private final Map<String, CollectionRef> collectionStates;
private Set<String> liveNodes;
@ -59,30 +57,42 @@ public class ClusterState implements JSONWriter.Writable {
this(null, liveNodes, collectionStates);
}
/**
* Use this constr when ClusterState is meant for consumption.
*/
public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes,
public ClusterState(Integer znodeVersion, Set<String> liveNodes,
Map<String, DocCollection> collectionStates) {
this.zkClusterStateVersion = zkClusterStateVersion;
this.liveNodes = new HashSet<>(liveNodes.size());
this.liveNodes.addAll(liveNodes);
this.collectionStates = new LinkedHashMap<>(collectionStates.size());
this.collectionStates.putAll(collectionStates);
this(liveNodes, getRefMap(collectionStates),znodeVersion);
}
private static Map<String, CollectionRef> getRefMap(Map<String, DocCollection> collectionStates) {
Map<String, CollectionRef> collRefs = new LinkedHashMap<>(collectionStates.size());
for (Entry<String, DocCollection> entry : collectionStates.entrySet()) {
final DocCollection c = entry.getValue();
collRefs.put(entry.getKey(), new CollectionRef(c));
}
return collRefs;
}
/**Use this if all the collection states are not readily available and some needs to be lazily loaded
*/
public ClusterState(Set<String> liveNodes, Map<String, CollectionRef> collectionStates, Integer znodeVersion){
this.znodeVersion = znodeVersion;
this.liveNodes = new HashSet<>(liveNodes.size());
this.liveNodes.addAll(liveNodes);
this.collectionStates = new LinkedHashMap<>(collectionStates);
}
public ClusterState copyWith(Map<String,DocCollection> modified){
ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates);
ClusterState result = new ClusterState(liveNodes, new LinkedHashMap<>(collectionStates), znodeVersion);
for (Entry<String, DocCollection> e : modified.entrySet()) {
DocCollection c = e.getValue();
if(c == null) {
result.collectionStates.remove(e.getKey());
continue;
}
result.collectionStates.put(c.getName(), c);
result.collectionStates.put(c.getName(), new CollectionRef(c));
}
return result;
}
@ -117,7 +127,7 @@ public class ClusterState implements JSONWriter.Writable {
* coreNodeName is the same as replicaName
*/
public Replica getReplica(final String collection, final String coreNodeName) {
return getReplica(collectionStates.get(collection), coreNodeName);
return getReplica(getCollectionOrNull(collection), coreNodeName);
}
/**
@ -165,7 +175,8 @@ public class ClusterState implements JSONWriter.Writable {
public DocCollection getCollectionOrNull(String coll) {
return collectionStates.get(coll);
CollectionRef ref = collectionStates.get(coll);
return ref == null? null:ref.get();
}
/**
@ -175,12 +186,6 @@ public class ClusterState implements JSONWriter.Writable {
return collectionStates.keySet();
}
/**
* @return Map&lt;collectionName, Map&lt;sliceName,Slice&gt;&gt;
*/
public Map<String, DocCollection> getCollectionStates() {
return Collections.unmodifiableMap(collectionStates);
}
/**
* Get names of the currently live nodes.
@ -194,14 +199,14 @@ public class ClusterState implements JSONWriter.Writable {
}
public String getShardId(String collectionName, String nodeName, String coreName) {
Collection<DocCollection> states = collectionStates.values();
Collection<CollectionRef> states = collectionStates.values();
if (collectionName != null) {
DocCollection c = getCollectionOrNull(collectionName);
if (c != null) states = Collections.singletonList(c);
CollectionRef c = collectionStates.get(collectionName);
if (c != null) states = Collections.singletonList( c );
}
for (DocCollection coll : states) {
for (Slice slice : coll.getSlices()) {
for (CollectionRef coll : states) {
for (Slice slice : coll.get().getSlices()) {
for (Replica replica : slice.getReplicas()) {
// TODO: for really large clusters, we could 'index' on this
String rnodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
@ -215,10 +220,10 @@ public class ClusterState implements JSONWriter.Writable {
return null;
}
public String getShardIdByCoreNodeName(String collectionName, String coreNodeName) {
/*public String getShardIdByCoreNodeName(String collectionName, String coreNodeName) {
Collection<DocCollection> states = collectionStates.values();
if (collectionName != null) {
DocCollection c = getCollectionOrNull(collectionName);
CollectionRef c = collectionStates.get(collectionName);
if (c != null) states = Collections.singletonList(c);
}
@ -232,7 +237,7 @@ public class ClusterState implements JSONWriter.Writable {
}
}
return null;
}
}*/
/**
* Check if node is alive.
@ -249,42 +254,31 @@ public class ClusterState implements JSONWriter.Writable {
return sb.toString();
}
/**
* Create ClusterState by reading the current state from zookeeper.
*/
public static ClusterState load(SolrZkClient zkClient, Set<String> liveNodes, ZkStateReader stateReader) throws KeeperException, InterruptedException {
Stat stat = new Stat();
byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE,
null, stat, true);
return load(stat.getVersion(), state, liveNodes);
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
return load(version, bytes, liveNodes, ZkStateReader.CLUSTER_STATE);
}
/**
* Create ClusterState from json string that is typically stored in zookeeper.
*
* Use {@link ClusterState#load(SolrZkClient, Set, ZkStateReader)} instead, unless you want to
* do something more when getting the data - such as get the stat, set watch, etc.
* @param version zk version of the clusterstate.json file (bytes)
* @param bytes clusterstate.json as a byte array
* @param liveNodes list of live nodes
* @return the ClusterState
*/
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes, String znode) {
// System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes)));
if (bytes == null || bytes.length == 0) {
return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap());
}
Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
Map<String,DocCollection> collections = new LinkedHashMap<>(stateMap.size());
Map<String,CollectionRef> collections = new LinkedHashMap<>(stateMap.size());
for (Entry<String, Object> entry : stateMap.entrySet()) {
String collectionName = entry.getKey();
DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version);
collections.put(collectionName, coll);
DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version, znode);
collections.put(collectionName, new CollectionRef(coll));
}
// System.out.println("######## ClusterState.load result:" + collections);
return new ClusterState( version, liveNodes, collections);
return new ClusterState( liveNodes, collections,version);
}
@ -297,7 +291,7 @@ public class ClusterState implements JSONWriter.Writable {
return new Aliases(aliasMap);
}
private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version) {
private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version, String znode) {
Map<String,Object> props;
Map<String,Slice> slices;
@ -324,7 +318,7 @@ public class ClusterState implements JSONWriter.Writable {
router = DocRouter.getDocRouter((String) routerProps.get("name"));
}
return new DocCollection(name, slices, props, router, version);
return new DocCollection(name, slices, props, router, version, znode);
}
private static Map<String,Slice> makeSlices(Map<String,Object> genericSlices) {
@ -344,7 +338,28 @@ public class ClusterState implements JSONWriter.Writable {
@Override
public void write(JSONWriter jsonWriter) {
jsonWriter.write(collectionStates);
if (collectionStates.size() == 1) {
CollectionRef ref = collectionStates.values().iterator().next();
DocCollection docCollection = ref.get();
if (docCollection.getStateFormat() > 1) {
jsonWriter.write(Collections.singletonMap(docCollection.getName(), docCollection));
// serializing a single DocCollection that is persisted outside of clusterstate.json
return;
}
}
LinkedHashMap<String , DocCollection> map = new LinkedHashMap<>();
for (Entry<String, CollectionRef> e : collectionStates.entrySet()) {
// using this class check to avoid fetching from ZK in case of lazily loaded collection
if (e.getValue().getClass() == CollectionRef.class) {
// check if it is a lazily loaded collection outside of clusterstate.json
DocCollection coll = e.getValue().get();
if (coll.getStateFormat() == 1) {
map.put(coll.getName(),coll);
}
}
}
jsonWriter.write(map);
}
/**
@ -353,7 +368,7 @@ public class ClusterState implements JSONWriter.Writable {
* @return null if ClusterState was created for publication, not consumption
*/
public Integer getZkClusterStateVersion() {
return zkClusterStateVersion;
return znodeVersion;
}
@Override
@ -361,7 +376,7 @@ public class ClusterState implements JSONWriter.Writable {
final int prime = 31;
int result = 1;
result = prime * result
+ ((zkClusterStateVersion == null) ? 0 : zkClusterStateVersion.hashCode());
+ ((znodeVersion == null) ? 0 : znodeVersion.hashCode());
result = prime * result + ((liveNodes == null) ? 0 : liveNodes.hashCode());
return result;
}
@ -372,9 +387,9 @@ public class ClusterState implements JSONWriter.Writable {
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
ClusterState other = (ClusterState) obj;
if (zkClusterStateVersion == null) {
if (other.zkClusterStateVersion != null) return false;
} else if (!zkClusterStateVersion.equals(other.zkClusterStateVersion)) return false;
if (znodeVersion == null) {
if (other.znodeVersion != null) return false;
} else if (!znodeVersion.equals(other.znodeVersion)) return false;
if (liveNodes == null) {
if (other.liveNodes != null) return false;
} else if (!liveNodes.equals(other.liveNodes)) return false;
@ -390,8 +405,22 @@ public class ClusterState implements JSONWriter.Writable {
this.liveNodes = liveNodes;
}
public DocCollection getCommonCollection(String name){
return collectionStates.get(name);
/**For internal use only
*/
Map<String, CollectionRef> getCollectionStates() {
return collectionStates;
}
public static class CollectionRef {
private final DocCollection coll;
public CollectionRef(DocCollection coll) {
this.coll = coll;
}
public DocCollection get(){
return coll;
}
}

View File

@ -34,12 +34,14 @@ import org.noggit.JSONWriter;
public class DocCollection extends ZkNodeProps {
public static final String DOC_ROUTER = "router";
public static final String SHARDS = "shards";
private int version;
public static final String STATE_FORMAT = "stateFormat";
private int znodeVersion;
private final String name;
private final Map<String, Slice> slices;
private final Map<String, Slice> activeSlices;
private final DocRouter router;
private final String znode;
private final Integer replicationFactor;
private final Integer maxShardsPerNode;
@ -47,7 +49,7 @@ public class DocCollection extends ZkNodeProps {
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
this(name, slices, props, router, -1);
this(name, slices, props, router, -1, ZkStateReader.CLUSTER_STATE);
}
/**
@ -55,9 +57,9 @@ public class DocCollection extends ZkNodeProps {
* @param slices The logical shards of the collection. This is used directly and a copy is not made.
* @param props The properties of the slice. This is used directly and a copy is not made.
*/
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion) {
super( props==null ? props = new HashMap<String,Object>() : props);
this.version = zkVersion;
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, String znode) {
super(props==null ? props = new HashMap<String,Object>() : props);
this.znodeVersion = zkVersion;
this.name = name;
this.slices = slices;
@ -89,10 +91,17 @@ public class DocCollection extends ZkNodeProps {
this.activeSlices.put(slice.getKey(), slice.getValue());
}
this.router = router;
this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
assert name != null && slices != null;
}
/**Use this to make an exact copy of DocCollection with a new set of Slices and every other property as is
* @param slices the new set of Slices
* @return the resulting DocCollection
*/
public DocCollection copyWithSlices(Map<String, Slice> slices){
return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode);
}
/**
* Return collection name.
@ -134,10 +143,13 @@ public class DocCollection extends ZkNodeProps {
return activeSlices;
}
public int getVersion(){
return version;
public int getZNodeVersion(){
return znodeVersion;
}
public int getStateFormat() {
return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1 : 2;
}
/**
* @return replication factor for this collection or null if no
* replication factor exists.
@ -157,6 +169,10 @@ public class DocCollection extends ZkNodeProps {
return maxShardsPerNode;
}
public String getZNode(){
return znode;
}
public DocRouter getRouter() {
return router;
@ -174,4 +190,12 @@ public class DocCollection extends ZkNodeProps {
all.put(SHARDS, slices);
jsonWriter.write(all);
}
public Replica getReplica(String coreNodeName) {
for (Slice slice : slices.values()) {
Replica replica = slice.getReplica(coreNodeName);
if (replica != null) return replica;
}
return null;
}
}

View File

@ -38,12 +38,14 @@ import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@ -100,9 +102,14 @@ public class ZkStateReader implements Closeable {
public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
public static final String SHARD_LEADERS_ZKNODE = "leaders";
private final Set<String> watchedCollections = new HashSet<String>();
/**These are collections which are actively watched by this instance .
*
*/
private Map<String , DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
//
// convenience methods... should these go somewhere else?
//
@ -163,7 +170,8 @@ public class ZkStateReader implements Closeable {
log.info("path={} {}={} specified config exists in ZooKeeper",
new Object[] {path, CONFIGNAME_PROP, configName});
}
} else {
throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path);
}
}
catch (KeeperException e) {
@ -252,22 +260,26 @@ public class ZkStateReader implements Closeable {
return aliases;
}
/*public Boolean checkValid(String coll, int version){
public Boolean checkValid(String coll, int version) {
DocCollection collection = clusterState.getCollectionOrNull(coll);
if(collection ==null) return null;
if(collection.getVersion() < version){
log.info("server older than client {}<{}",collection.getVersion(),version);
DocCollection nu = getExternCollectionFresh(this, coll);
if(nu.getVersion()> collection.getVersion()){
updateExternCollection(nu);
if (collection == null) return null;
if (collection.getZNodeVersion() < version) {
log.debug("server older than client {}<{}", collection.getZNodeVersion(), version);
DocCollection nu = getCollectionLive(this, coll);
if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
updateWatchedCollection(nu);
collection = nu;
}
}
if(collection.getVersion() == version) return Boolean.TRUE;
log.info("wrong version from client {}!={} ",version, collection.getVersion());
if (collection.getZNodeVersion() == version) {
return Boolean.TRUE;
}
log.debug("wrong version from client {}!={} ", version, collection.getZNodeVersion());
return Boolean.FALSE;
}*/
}
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
InterruptedException {
@ -296,18 +308,9 @@ public class ZkStateReader implements Closeable {
synchronized (ZkStateReader.this.getUpdateLock()) {
// remake watch
final Watcher thisWatch = this;
Stat stat = new Stat();
byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat ,
true);
Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes();
ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln);
// update volatile
ZkStateReader.this.clusterState = clusterState;
// HashSet<String> all = new HashSet<>(colls);;
// all.addAll(clusterState.getAllInternalCollections());
// all.remove(null);
ZkStateReader.this.clusterState = constructState(ln, thisWatch);
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
@ -376,8 +379,7 @@ public class ZkStateReader implements Closeable {
Set<String> liveNodeSet = new HashSet<>();
liveNodeSet.addAll(liveNodes);
ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this);
this.clusterState = clusterState;
this.clusterState = constructState(liveNodeSet, null);
zkClient.exists(ALIASES,
new Watcher() {
@ -423,8 +425,68 @@ public class ZkStateReader implements Closeable {
}, true);
}
updateAliases();
//on reconnect of SolrZkClient re-add watchers for the watched external collections
synchronized (this) {
for (String watchedCollection : watchedCollections) {
addZkWatch(watchedCollection);
}
}
}
private ClusterState constructState(Set<String> ln, Watcher watcher)
throws KeeperException, InterruptedException {
Stat stat = new Stat();
byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
ClusterState loadedData = ClusterState.load(stat.getVersion(), data, ln,
CLUSTER_STATE);
Map<String,ClusterState.CollectionRef> result = new LinkedHashMap<>();
result.putAll(loadedData.getCollectionStates());// first load all
// collections in
// clusterstate.json
for (String s : getIndividualColls()) {
DocCollection watched = watchedCollectionStates.get(s);
if (watched != null) {
// if it is a watched collection, add too
result.put(s, new ClusterState.CollectionRef(watched));
} else {
// if it is not collection, then just create a reference which can fetch
// the collection object just in time from ZK
final String collName = s;
result.put(s, new ClusterState.CollectionRef(null) {
@Override
public DocCollection get() {
return getCollectionLive(ZkStateReader.this, collName);
}
});
}
}
return new ClusterState(ln, result, stat.getVersion());
}
private Set<String> getIndividualColls() throws KeeperException, InterruptedException {
List<String> children = null;
try {
children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
} catch (KeeperException.NoNodeException e) {
log.warn("Error fetching collection names");
return new HashSet<>();
}
if (children == null || children.isEmpty()) return new HashSet<>();
HashSet<String> result = new HashSet<>(children.size());
for (String c : children) {
try {
if (zkClient.exists(getCollectionPath(c), true)) {
result.add(c);
}
} catch (Exception e) {
log.warn("Error reading collections nodes", e);
}
}
return result;
}
// load and publish a new CollectionInfo
private synchronized void updateClusterState(boolean immediate,
@ -443,7 +505,7 @@ public class ZkStateReader implements Closeable {
if (!onlyLiveNodes) {
log.debug("Updating cloud state from ZooKeeper... ");
clusterState = ClusterState.load(zkClient, liveNodesSet,this);
clusterState = constructState(liveNodesSet,null);
} else {
log.debug("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size());
clusterState = this.clusterState;
@ -451,6 +513,11 @@ public class ZkStateReader implements Closeable {
}
this.clusterState = clusterState;
}
synchronized (ZkStateReader.this) {
for (String watchedCollection : watchedCollections) {
updateWatchedCollection(getCollectionLive(this, watchedCollection));
}
}
} else {
if (clusterStateUpdateScheduled) {
@ -475,13 +542,12 @@ public class ZkStateReader implements Closeable {
if (!onlyLiveNodes) {
log.debug("Updating cloud state from ZooKeeper... ");
clusterState = ClusterState.load(zkClient, liveNodesSet,ZkStateReader.this);
clusterState = constructState(liveNodesSet,null);
} else {
log.debug("Updating live nodes from ZooKeeper... ");
clusterState = ZkStateReader.this.clusterState;
clusterState.setLiveNodes(liveNodesSet);
}
ZkStateReader.this.clusterState = clusterState;
@ -504,13 +570,18 @@ public class ZkStateReader implements Closeable {
}
// update volatile
ZkStateReader.this.clusterState = clusterState;
synchronized (ZkStateReader.this) {
for (String watchedCollection : watchedCollections) {
updateWatchedCollection(getCollectionLive(ZkStateReader.this, watchedCollection));
}
}
}
}
}, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
}
}
/**
* @return information about the cluster from ZooKeeper
*/
@ -679,4 +750,131 @@ public class ZkStateReader implements Closeable {
}
}
public static DocCollection getCollectionLive(ZkStateReader zkStateReader,
String coll) {
String collectionPath = getCollectionPath(coll);
try {
if (!zkStateReader.getZkClient().exists(collectionPath, true)) return null;
Stat stat = new Stat();
byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true);
ClusterState state = ClusterState.load(stat.getVersion(), data,
Collections.<String> emptySet(), collectionPath);
ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
return collectionRef == null ? null : collectionRef.get();
} catch (KeeperException.NoNodeException e) {
log.warn("No node available : " + collectionPath, e);
return null;
} catch (KeeperException e) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Could not load collection from ZK:" + coll, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.BAD_REQUEST,
"Could not load collection from ZK:" + coll, e);
}
}
public static String getCollectionPath(String coll) {
return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
}
public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
synchronized (this) {
if (watchedCollections.contains(coll)) return;
else {
watchedCollections.add(coll);
}
addZkWatch(coll);
}
}
private void addZkWatch(final String coll) throws KeeperException,
InterruptedException {
log.info("addZkWatch {}", coll);
final String fullpath = getCollectionPath(coll);
synchronized (getUpdateLock()) {
cmdExecutor.ensureExists(fullpath, zkClient);
log.info("Updating collection state at {} from ZooKeeper... ", fullpath);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
// session events are not change events,
// and do not remove the watcher
if (EventType.None.equals(event.getType())) {
return;
}
log.info("A cluster state change: {}, has occurred - updating... ",
(event), ZkStateReader.this.clusterState == null ? 0
: ZkStateReader.this.clusterState.getLiveNodes().size());
try {
// delayed approach
// ZkStateReader.this.updateClusterState(false, false);
synchronized (ZkStateReader.this.getUpdateLock()) {
if (!watchedCollections.contains(coll)) {
log.info("Unwatched collection {}", coll);
return;
}
// remake watch
final Watcher thisWatch = this;
Stat stat = new Stat();
byte[] data = zkClient.getData(fullpath, thisWatch, stat, true);
if (data == null || data.length == 0) {
log.warn("No value set for collection state : {}", coll);
return;
}
ClusterState clusterState = ClusterState.load(stat.getVersion(),
data, Collections.<String> emptySet(), fullpath);
// update volatile
DocCollection newState = clusterState.getCollectionStates()
.get(coll).get();
updateWatchedCollection(newState);
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("Unwatched collection :" + coll, e);
throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Unwatched collection :" + coll, e);
return;
}
}
};
zkClient.exists(fullpath, watcher, true);
}
updateWatchedCollection(getCollectionLive(this, coll));
}
private void updateWatchedCollection(DocCollection newState) {
watchedCollectionStates.put(newState.getName(), newState);
log.info("Updating data for {} to ver {} ", newState.getName(),
newState.getZNodeVersion());
this.clusterState = clusterState.copyWith(Collections.singletonMap(
newState.getName(), newState));
}
/** This is not a public API. Only used by ZkController */
public void removeZKWatch(final String coll) {
synchronized (this) {
watchedCollections.remove(coll);
clusterState = clusterState.copyWith(Collections
.<String,DocCollection> singletonMap(coll, null));
}
}
}

View File

@ -20,7 +20,6 @@ package org.apache.solr.client.solrj.impl;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@ -42,7 +41,6 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@ -51,7 +49,6 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
@ -62,7 +59,6 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -125,6 +121,7 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase {
@Override
public void doTest() throws Exception {
allTests();
stateVersionParamTest();
}
private void allTests() throws Exception {
@ -345,7 +342,77 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase {
SolrInputDocument doc = getDoc(fields);
indexDoc(doc);
}
private void stateVersionParamTest() throws Exception {
CloudSolrServer client = createCloudClient(null);
try {
String collectionName = "checkStateVerCol";
createCollection(collectionName, client, 2, 2);
waitForRecoveriesToFinish(collectionName, false);
DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName);
Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
HttpSolrServer httpSolrServer = new HttpSolrServer(r.getStr(ZkStateReader.BASE_URL_PROP) + "/"+collectionName);
SolrQuery q = new SolrQuery().setQuery("*:*");
log.info("should work query, result {}", httpSolrServer.query(q));
//no problem
q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
log.info("2nd query , result {}", httpSolrServer.query(q));
//no error yet good
q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+ (coll.getZNodeVersion() -1)); //an older version expect error
HttpSolrServer.RemoteSolrException sse = null;
try {
httpSolrServer.query(q);
log.info("expected query error");
} catch (HttpSolrServer.RemoteSolrException e) {
sse = e;
}
httpSolrServer.shutdown();
assertNotNull(sse);
assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
//now send the request to another node that does n ot serve the collection
Set<String> allNodesOfColl = new HashSet<>();
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
}
}
String theNode = null;
for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) {
String n = client.getZkStateReader().getBaseUrlForNodeName(s);
if(!allNodesOfColl.contains(s)){
theNode = n;
break;
}
}
log.info("thenode which does not serve this collection{} ",theNode);
assertNotNull(theNode);
httpSolrServer = new HttpSolrServer(theNode + "/"+collectionName);
q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
try {
httpSolrServer.query(q);
log.info("error was expected");
} catch (HttpSolrServer.RemoteSolrException e) {
sse = e;
}
httpSolrServer.shutdown();
assertNotNull(sse);
assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
} finally {
client.shutdown();
}
}
public void testShutdown() throws MalformedURLException {
CloudSolrServer server = new CloudSolrServer("[ff01::114]:33332");
try {

View File

@ -61,6 +61,7 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -342,6 +343,19 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
return createJettys(numJettys, false);
}
protected int defaultStateFormat = 1 + random().nextInt(2);
protected int getStateFormat() {
String stateFormat = System.getProperty("tests.solr.stateFormat", null);
if (stateFormat != null) {
if ("2".equals(stateFormat)) {
return defaultStateFormat = 2;
} else if ("1".equals(stateFormat)) {
return defaultStateFormat = 1;
}
}
return defaultStateFormat; // random
}
/**
* @param checkCreatedVsState
@ -354,6 +368,18 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
List<SolrServer> clients = new ArrayList<>();
StringBuilder sb = new StringBuilder();
if (getStateFormat() == 2) {
log.info("Creating collection1 with stateFormat=2");
SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(),
AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT);
Overseer.getInQueue(zkClient).offer(
ZkStateReader.toJSON(ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.CREATECOLLECTION, "name",
DEFAULT_COLLECTION, "numShards", String.valueOf(sliceCount),
DocCollection.STATE_FORMAT, getStateFormat())));
zkClient.close();
}
for (int i = 1; i <= numJettys; i++) {
if (sb.length() > 0) sb.append(',');
int cnt = this.jettyIntCntr.incrementAndGet();
@ -1501,6 +1527,10 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
collectionInfos.put(collectionName, list);
}
params.set("name", collectionName);
if (getStateFormat() == 2) {
log.info("Creating collection with stateFormat=2: " + collectionName);
params.set(DocCollection.STATE_FORMAT, "2");
}
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");