diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index e7b01712d91..11fb5d66b4f 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -144,6 +144,13 @@ New Features * SOLR-5098: Schema API: Add REST support for adding field types to the schema. (Timothy Potter) +* SOLR-5473 : Split clusterstate.json per collection and watch states selectively + (Noble Paul, Mark Miller, shalin, Jessica Cheng Mallet, Timothy Potter, Ashum Gupta) + +* SOLR-5474 : Support for SOLR-5473 in SolrJ (Timothy Potter, Noble Paul, Mark Miller) + +* SOLR-5810 : Support for SOLR-5473 in solr admin UI (Timothy Potter, Noble Paul) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 960b5283ce8..ea092d91ee7 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -108,6 +108,8 @@ public class Overseer implements Closeable { private Map clusterProps; private boolean isClosed = false; + private final Map updateNodes = new LinkedHashMap<>(); + private boolean isClusterStateModified = false; public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) { @@ -261,6 +263,7 @@ public class Overseer implements Closeable { stateUpdateQueue.poll(); if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break; + if(!updateNodes.isEmpty()) break; // if an event comes in the next 100ms batch it together head = stateUpdateQueue.peek(100); } @@ -300,8 +303,30 @@ public class Overseer implements Closeable { TimerContext timerContext = stats.time("update_state"); boolean success = false; try { - zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true); - lastUpdatedTime = System.nanoTime(); + if (!updateNodes.isEmpty()) { + for (Entry e : updateNodes.entrySet()) { + if (e.getValue() == null) { + if (zkClient.exists(e.getKey(), true)) zkClient.delete(e.getKey(), 0, true); + } else { + byte[] data = ZkStateReader.toJSON(e.getValue()); + if (zkClient.exists(e.getKey(), true)) { + log.info("going to update_collection {}", e.getKey()); + zkClient.setData(e.getKey(), data, true); + } else { + log.info("going to create_collection {}", e.getKey(), new String(data)); + zkClient.create(e.getKey(), data, CreateMode.PERSISTENT, true); + } + } + } + updateNodes.clear(); + } + + if (isClusterStateModified) { + lastUpdatedTime = System.nanoTime(); + zkClient.setData(ZkStateReader.CLUSTER_STATE, + ZkStateReader.toJSON(clusterState), true); + isClusterStateModified = false; + } success = true; } finally { timerContext.stop(); @@ -703,7 +728,7 @@ public class Overseer implements Closeable { } Slice slice = clusterState.getSlice(collection, sliceName); - + Map replicaProps = new LinkedHashMap<>(); replicaProps.putAll(message.getProperties()); @@ -721,7 +746,7 @@ public class Overseer implements Closeable { replicaProps.remove(ZkStateReader.SHARD_ID_PROP); replicaProps.remove(ZkStateReader.COLLECTION_PROP); replicaProps.remove(QUEUE_OPERATION); - + // remove any props with null values Set> entrySet = replicaProps.entrySet(); List removeKeys = new ArrayList<>(); @@ -869,10 +894,22 @@ public class Overseer implements Closeable { } collectionProps.put(DocCollection.DOC_ROUTER, routerSpec); - if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true"); - DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router); - return newState(state, singletonMap(newCollection.getName(), newCollection)); + if (message.getStr("fromApi") == null) { + collectionProps.put("autoCreated", "true"); } + + String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null + : ZkStateReader.getCollectionPath(collectionName); + + DocCollection newCollection = new DocCollection(collectionName, + newSlices, collectionProps, router, -1, znode); + + isClusterStateModified = true; + + log.info("state version {} {}", collectionName, newCollection.getStateFormat()); + + return newState(state, singletonMap(newCollection.getName(), newCollection)); + } /* * Return an already assigned id or null if not assigned @@ -909,30 +946,28 @@ public class Overseer implements Closeable { } return null; } - + private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) { // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates())); // System.out.println("Updating slice:" + slice); - + DocCollection newCollection = null; DocCollection coll = state.getCollectionOrNull(collectionName) ; Map slices; - Map props; - DocRouter router; - + if (coll == null) { // when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself // without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router. - slices = new HashMap<>(1); - props = new HashMap<>(1); + slices = new LinkedHashMap<>(1); + slices.put(slice.getName(), slice); + Map props = new HashMap<>(1); props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME)); - router = new ImplicitDocRouter(); + newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter()); } else { - props = coll.getProperties(); - router = coll.getRouter(); slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy + slices.put(slice.getName(), slice); + newCollection = coll.copyWithSlices(slices); } - slices.put(slice.getName(), slice); - DocCollection newCollection = new DocCollection(collectionName, slices, props, router); + // System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections)); @@ -991,27 +1026,52 @@ public class Overseer implements Closeable { } - DocCollection newCollection = new DocCollection(coll.getName(), slices, coll.getProperties(), coll.getRouter()); + DocCollection newCollection = coll.copyWithSlices(slices); return newState(state, singletonMap(collectionName, newCollection)); } - private ClusterState newState(ClusterState state, Map colls) { - return state.copyWith(colls); - } - - /* - * Remove collection from cloudstate - */ - private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) { - final String collection = message.getStr("name"); - if (!checkKeyExistence(message, "name")) return clusterState; - DocCollection coll = clusterState.getCollectionOrNull(collection); - if(coll !=null) { - return clusterState.copyWith(singletonMap(collection,(DocCollection)null)); + private ClusterState newState(ClusterState state, Map colls) { + for (Entry e : colls.entrySet()) { + DocCollection c = e.getValue(); + if (c == null) { + isClusterStateModified = true; + state = state.copyWith(singletonMap(e.getKey(), (DocCollection) null)); + updateNodes.put(ZkStateReader.getCollectionPath(e.getKey()) ,null); + continue; } - return clusterState; - } + if (c.getStateFormat() > 1) { + updateNodes.put(ZkStateReader.getCollectionPath(c.getName()), + new ClusterState(-1, Collections.emptySet(), singletonMap(c.getName(), c))); + } else { + isClusterStateModified = true; + } + state = state.copyWith(singletonMap(e.getKey(), c)); + + } + return state; + } + + /* + * Remove collection from cloudstate + */ + private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) { + final String collection = message.getStr("name"); + if (!checkKeyExistence(message, "name")) return clusterState; + DocCollection coll = clusterState.getCollectionOrNull(collection); + if(coll == null) return clusterState; + + isClusterStateModified = true; + if (coll.getStateFormat() > 1) { + try { + log.info("Deleting state for collection : {}", collection); + zkClient.delete(ZkStateReader.getCollectionPath(collection), -1, true); + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to remove collection state :" + collection); + } + } + return newState(clusterState, singletonMap(coll.getName(),(DocCollection) null)); + } /* * Remove collection slice from cloudstate */ @@ -1027,7 +1087,7 @@ public class Overseer implements Closeable { Map newSlices = new LinkedHashMap<>(coll.getSlicesMap()); newSlices.remove(sliceId); - DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter()); + DocCollection newCollection = coll.copyWithSlices(newSlices); return newState(clusterState, singletonMap(collection,newCollection)); } @@ -1039,8 +1099,6 @@ public class Overseer implements Closeable { final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); if (!checkCollectionKeyExistence(message)) return clusterState; -// final Map newCollections = new LinkedHashMap<>(clusterState.getCollectionStates()); // shallow copy -// DocCollection coll = newCollections.get(collection); DocCollection coll = clusterState.getCollectionOrNull(collection) ; if (coll == null) { // TODO: log/error that we didn't find it? @@ -1078,7 +1136,7 @@ public class Overseer implements Closeable { newSlices.put(slice.getName(), slice); } } - + if (lastSlice) { // remove all empty pre allocated slices for (Slice slice : coll.getSlices()) { @@ -1095,7 +1153,7 @@ public class Overseer implements Closeable { // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or // ZkController out of the Overseer. try { - zkClient.clean("/collections/" + collection); + zkClient.delete("/collections/" + collection, -1, true); } catch (InterruptedException e) { SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e); Thread.currentThread().interrupt(); @@ -1105,8 +1163,8 @@ public class Overseer implements Closeable { return newState(clusterState,singletonMap(collection, (DocCollection) null)); } else { - DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter()); - return newState(clusterState,singletonMap(collection,newCollection)); + DocCollection newCollection = coll.copyWithSlices(newSlices); + return newState(clusterState,singletonMap(collection,newCollection)); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index 7676e881894..1ee13ff1f96 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -698,7 +698,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { // convert cluster state into a map of writable types byte[] bytes = ZkStateReader.toJSON(clusterState); - Map stateMap = (Map) ZkStateReader.fromJSON(bytes); + Map stateMap = (Map) ZkStateReader.fromJSON(bytes); String shard = message.getStr(ZkStateReader.SHARD_ID_PROP); NamedList collectionProps = new SimpleOrderedMap(); @@ -706,7 +706,13 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { Set collections = clusterState.getCollections(); for (String name : collections) { Map collectionStatus = null; - collectionStatus = getCollectionStatus((Map) stateMap.get(name), name, shard); + if (clusterState.getCollection(name).getStateFormat() > 1) { + bytes = ZkStateReader.toJSON(clusterState.getCollection(name)); + Map docCollection = (Map) ZkStateReader.fromJSON(bytes); + collectionStatus = getCollectionStatus(docCollection, name, shard); + } else { + collectionStatus = getCollectionStatus((Map) stateMap.get(name), name, shard); + } if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) { collectionStatus.put("aliases", collectionVsAliases.get(name)); } @@ -715,8 +721,12 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { } else { String routeKey = message.getStr(ShardParams._ROUTE_); Map docCollection = null; - - docCollection = (Map) stateMap.get(collection); + if (clusterState.getCollection(collection).getStateFormat() > 1) { + bytes = ZkStateReader.toJSON(clusterState.getCollection(collection)); + docCollection = (Map) ZkStateReader.fromJSON(bytes); + } else { + docCollection = (Map) stateMap.get(collection); + } if (routeKey == null) { Map collectionStatus = getCollectionStatus(docCollection, collection, shard); if (collectionVsAliases.containsKey(collection) && !collectionVsAliases.get(collection).isEmpty()) { @@ -2409,11 +2419,15 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { } - if(configName!= null){ - log.info("creating collections conf node {} ",ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll); - zkStateReader.getZkClient().makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll, - ZkStateReader.toJSON(ZkNodeProps.makeMap(ZkController.CONFIGNAME_PROP,configName)),true ); - + if (configName != null) { + String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll; + log.info("creating collections conf node {} ", collDir); + byte[] data = ZkStateReader.toJSON(ZkNodeProps.makeMap(ZkController.CONFIGNAME_PROP, configName)); + if (zkStateReader.getZkClient().exists(collDir, true)) { + zkStateReader.getZkClient().setData(collDir, data, true); + } else { + zkStateReader.getZkClient().makePath(collDir, data, true); + } } else { if(isLegacyCloud){ log.warn("Could not obtain config name"); diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index c5b908db5ae..5ff39836d5b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1165,6 +1165,19 @@ public final class ZkController { } CloudDescriptor cloudDescriptor = cd.getCloudDescriptor(); + boolean removeWatch = true; + // if there is no SolrCore which is a member of this collection, remove the watch + for (SolrCore solrCore : cc.getCores()) { + CloudDescriptor cloudDesc = solrCore.getCoreDescriptor() + .getCloudDescriptor(); + if (cloudDesc != null + && cloudDescriptor.getCollectionName().equals( + cloudDesc.getCollectionName())) { + removeWatch = false; + break; + } + } + if (removeWatch) zkStateReader.removeZKWatch(collection); ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.DELETECORE, ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.NODE_NAME_PROP, getNodeName(), @@ -1466,6 +1479,11 @@ public final class ZkController { } publish(cd, ZkStateReader.DOWN, false, true); + DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName()); + if(collection !=null && collection.getStateFormat()>1 ){ + log.info("Registering watch for external collection {}",cd.getCloudDescriptor().getCollectionName()); + zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName()); + } } catch (KeeperException e) { log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 5ba6bd3be07..14ba320bec5 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -24,6 +24,7 @@ import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATESHARD; import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET; import static org.apache.solr.cloud.OverseerCollectionProcessor.DELETEREPLICA; import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES; +import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR; import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID; import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER; import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP; @@ -59,6 +60,7 @@ import org.apache.solr.cloud.OverseerSolrResponse; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; @@ -142,7 +144,7 @@ public class CollectionsHandler extends RequestHandlerBase { if (action == null) { throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: " + a); } - + switch (action) { case CREATE: { this.handleCreateAction(req, rsp); @@ -468,15 +470,16 @@ public class CollectionsHandler extends RequestHandlerBase { Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.CREATECOLLECTION, "fromApi","true"); - copyIfNotNull(req.getParams(), props, - "name", - ZkStateReader.REPLICATION_FACTOR, + copyIfNotNull(req.getParams(),props, + "name", + REPLICATION_FACTOR, COLL_CONF, NUM_SLICES, MAX_SHARDS_PER_NODE, CREATE_NODE_SET, SHARDS_PROP, ASYNC, + DocCollection.STATE_FORMAT, AUTO_ADD_REPLICAS, "router."); @@ -670,4 +673,9 @@ public class CollectionsHandler extends RequestHandlerBase { public String getDescription() { return "Manage SolrCloud Collections"; } + + @Override + public String getSource() { + return "$URL: https://svn.apache.org/repos/asf/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionHandler.java $"; + } } diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index ab2c0f38586..ff4959dd4a1 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -318,6 +318,7 @@ public class SolrDispatchFilter extends BaseSolrFilter { String coreUrl = getRemotCoreUrl(cores, corename, origCorename); // don't proxy for internal update requests SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString()); + checkStateIsValid(cores, queryParams.get(CloudSolrServer.STATE_VERSION)); if (coreUrl != null && queryParams .get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) { @@ -373,6 +374,7 @@ public class SolrDispatchFilter extends BaseSolrFilter { if( "/select".equals( path ) || "/select/".equals( path ) ) { solrReq = parser.parse( core, path, req ); + checkStateIsValid(cores,solrReq.getParams().get(CloudSolrServer.STATE_VERSION)); String qt = solrReq.getParams().get( CommonParams.QT ); handler = core.getRequestHandler( qt ); if( handler == null ) { @@ -462,6 +464,22 @@ public class SolrDispatchFilter extends BaseSolrFilter { chain.doFilter(request, response); } + private void checkStateIsValid(CoreContainer cores, String stateVer) { + if (stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware()) { + // many have multiple collections separated by | + String[] pairs = StringUtils.split(stateVer, '|'); + for (String pair : pairs) { + String[] pcs = StringUtils.split(pair, ':'); + if (pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()) { + Boolean status = cores.getZkController().getZkStateReader().checkValid(pcs[0], Integer.parseInt(pcs[1])); + + if (Boolean.TRUE != status) { + throw new SolrException(ErrorCode.INVALID_STATE, "STATE STALE: " + pair + "valid : " + status); + } + } + } + } + } private void processAliases(SolrQueryRequest solrReq, Aliases aliases, List collectionsList) { diff --git a/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java b/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java index e5c2e540d95..30aa9cddbc0 100644 --- a/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java +++ b/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java @@ -24,7 +24,6 @@ import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; @@ -35,9 +34,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.lucene.util.BytesRef; -import org.noggit.CharArr; -import org.noggit.JSONWriter; -import org.noggit.ObjectBuilder; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.SolrZkClient; @@ -46,14 +42,15 @@ import org.apache.solr.common.params.SolrParams; import org.apache.solr.core.CoreContainer; import org.apache.solr.util.FastWriter; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; import org.noggit.CharArr; import org.noggit.JSONWriter; +import org.noggit.ObjectBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Zookeeper Info * @@ -90,7 +87,6 @@ public final class ZookeeperInfoServlet extends HttpServlet { String path = params.get("path"); String addr = params.get("addr"); - boolean all = "true".equals(params.get("all")); if (addr != null && addr.length() == 0) { addr = null; @@ -110,6 +106,7 @@ public final class ZookeeperInfoServlet extends HttpServlet { ZKPrinter printer = new ZKPrinter(response, out, cores.getZkController(), addr); printer.detail = detail; printer.dump = dump; + printer.isTreeView = (params.get("wt") == null); // this is hacky but tree view requests don't come in with the wt set try { printer.print(path); @@ -140,6 +137,8 @@ public final class ZookeeperInfoServlet extends HttpServlet { boolean detail = false; boolean dump = false; + boolean isTreeView = false; + String addr; // the address passed to us String keeperAddr; // the address we're connected to @@ -385,6 +384,47 @@ public final class ZookeeperInfoServlet extends HttpServlet { dataStrErr = "data is not parsable as a utf8 String: " + e.toString(); } } + // pull in external collections too + if (ZkStateReader.CLUSTER_STATE.equals(path) && !isTreeView) { + SortedMap collectionStates = null; + List children = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, null, true); + java.util.Collections.sort(children); + for (String collection : children) { + String collStatePath = ZkStateReader.getCollectionPath(collection); + String childDataStr = null; + try { + byte[] childData = zkClient.getData(collStatePath, null, null, true); + if (childData != null) { + childDataStr = (new BytesRef(childData)).utf8ToString(); + } + } catch (KeeperException.NoNodeException nne) { + // safe to ignore + } catch (Exception childErr) { + log.error("Failed to get "+collStatePath+" due to: "+childErr); + } + + if (childDataStr != null) { + if (collectionStates == null) { + // initialize lazily as there may not be any external collections + collectionStates = new TreeMap<>(); + + // add the internal collections + if (dataStr != null) + collectionStates.putAll((Map)ObjectBuilder.fromJSON(dataStr)); + } + + // now add in the external collections + Map extColl = (Map)ObjectBuilder.fromJSON(childDataStr); + collectionStates.put(collection, extColl.get(collection)); + } + } + + if (collectionStates != null) { + CharArr out = new CharArr(); + new JSONWriter(out, 2).write(collectionStates); + dataStr = out.toString(); + } + } json.writeString("znode"); json.writeNameSeparator(); diff --git a/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java b/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java new file mode 100644 index 00000000000..2f5a694260f --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java @@ -0,0 +1,115 @@ +package org.apache.solr.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.solr.client.solrj.impl.CloudSolrServer; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.zookeeper.data.Stat; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; + +public class ExternalCollectionsTest extends AbstractFullDistribZkTestBase { + private CloudSolrServer client; + + @BeforeClass + public static void beforeThisClass2() throws Exception { + + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + System.setProperty("numShards", Integer.toString(sliceCount)); + System.setProperty("solr.xml.persist", "true"); + client = createCloudClient(null); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + client.shutdown(); + } + + protected String getSolrXml() { + return "solr-no-core.xml"; + } + + public ExternalCollectionsTest() { + fixShardCount = true; + + sliceCount = 2; + shardCount = 4; + + checkCreatedVsState = false; + } + + + @Override + public void doTest() throws Exception { + testZkNodeLocation(); + } + + + + @Override + protected int getStateFormat() { + return 2; + } + + private void testZkNodeLocation() throws Exception{ + + String collectionName = "myExternColl"; + + createCollection(collectionName, client, 2, 2); + + waitForRecoveriesToFinish(collectionName, false); + assertTrue("does not exist collection state externally", + cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true)); + Stat stat = new Stat(); + byte[] data = cloudClient.getZkStateReader().getZkClient().getData(ZkStateReader.getCollectionPath(collectionName), null, stat, true); + DocCollection c = ZkStateReader.getCollectionLive(cloudClient.getZkStateReader(), collectionName); + ClusterState clusterState = cloudClient.getZkStateReader().getClusterState(); + assertEquals("The zkversion of the nodes must be same zkver:" + stat.getVersion() , stat.getVersion(),clusterState.getCollection(collectionName).getZNodeVersion() ); + assertTrue("DocCllection#getStateFormat() must be > 1", cloudClient.getZkStateReader().getClusterState().getCollection(collectionName).getStateFormat() > 1); + + + // remove collection + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set("action", CollectionParams.CollectionAction.DELETE.toString()); + params.set("name", collectionName); + QueryRequest request = new QueryRequest(params); + request.setPath("/admin/collections"); + if (client == null) { + client = createCloudClient(null); + } + + client.request(request); + + checkForMissingCollection(collectionName); + assertFalse("collection state should not exist externally", cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true)); + + } +} + diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java index 91d40760567..f83611def4f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java @@ -18,6 +18,8 @@ package org.apache.solr.client.solrj.impl; */ import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -29,13 +31,16 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; +import org.apache.http.NoHttpResponseException; import org.apache.http.client.HttpClient; +import org.apache.http.conn.ConnectTimeoutException; import org.apache.solr.client.solrj.ResponseParser; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServer; @@ -66,6 +71,8 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.apache.solr.common.util.StrUtils; import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * SolrJ client class to communicate with SolrCloud. @@ -78,6 +85,8 @@ import org.apache.zookeeper.KeeperException; * with {@link #setIdField(String)}. */ public class CloudSolrServer extends SolrServer { + private static final Logger log = LoggerFactory.getLogger(CloudSolrServer.class); + private volatile ZkStateReader zkStateReader; private String zkHost; // the zk server address private int zkConnectTimeout = 10000; @@ -86,6 +95,8 @@ public class CloudSolrServer extends SolrServer { private final LBHttpSolrServer lbServer; private final boolean shutdownLBHttpSolrServer; private HttpClient myClient; + //no of times collection state to be reloaded if stale state error is received + private static final int MAX_STALE_RETRIES = 5; Random rand = new Random(); private final boolean updatesToLeaders; @@ -94,6 +105,7 @@ public class CloudSolrServer extends SolrServer { .newCachedThreadPool(new SolrjNamedThreadFactory( "CloudSolrServer ThreadPool")); private String idField = "id"; + public static final String STATE_VERSION = "_stateVer_"; private final Set NON_ROUTABLE_PARAMS; { NON_ROUTABLE_PARAMS = new HashSet<>(); @@ -111,6 +123,36 @@ public class CloudSolrServer extends SolrServer { // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK); } + private volatile long timeToLive = 60* 1000L; + + + protected Map collectionStateCache = new ConcurrentHashMap(){ + @Override + public ExpiringCachedDocCollection get(Object key) { + ExpiringCachedDocCollection val = super.get(key); + if(val == null) return null; + if(val.isExpired(timeToLive)) { + super.remove(key); + return null; + } + return val; + } + + }; + + class ExpiringCachedDocCollection { + DocCollection cached; + long cachedAt; + + ExpiringCachedDocCollection(DocCollection cached) { + this.cached = cached; + this.cachedAt = System.currentTimeMillis(); + } + + boolean isExpired(long timeToLive) { + return (System.currentTimeMillis() - cachedAt) > timeToLive; + } + } /** * Create a new client object that connects to Zookeeper and is always aware @@ -133,7 +175,15 @@ public class CloudSolrServer extends SolrServer { * "zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181" */ public CloudSolrServer(String zkHost) { - this(zkHost, true); + this.zkHost = zkHost; + this.myClient = HttpClientUtil.createClient(null); + this.lbServer = new LBHttpSolrServer(myClient); + this.lbServer.setRequestWriter(new BinaryRequestWriter()); + this.lbServer.setParser(new BinaryResponseParser()); + this.updatesToLeaders = true; + shutdownLBHttpSolrServer = true; + setupStateVerParamOnQueryString(lbServer); + } /** @@ -151,6 +201,15 @@ public class CloudSolrServer extends SolrServer { this.lbServer.setParser(new BinaryResponseParser()); this.updatesToLeaders = updatesToLeaders; shutdownLBHttpSolrServer = true; + setupStateVerParamOnQueryString(lbServer); + } + + /**Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json + * @param seconds ttl value in seconds + */ + public void setCollectionCacheTTl(int seconds){ + assert seconds > 0; + timeToLive = seconds*1000L; } /** @@ -178,8 +237,24 @@ public class CloudSolrServer extends SolrServer { this.lbServer = lbServer; this.updatesToLeaders = updatesToLeaders; shutdownLBHttpSolrServer = false; + setupStateVerParamOnQueryString(lbServer); + } + /** + * Used internally to setup the _stateVer_ param to be sent in the query string of requests + * coming from this instance. + */ + protected void setupStateVerParamOnQueryString(LBHttpSolrServer lbServer) { + // setup the stateVer param to be passed in the query string of every request + Set queryStringParams = lbServer.getQueryParams(); + if (queryStringParams == null) { + queryStringParams = new HashSet(2); + lbServer.setQueryParams(queryStringParams); + } + queryStringParams.add(STATE_VERSION); + } + public ResponseParser getParser() { return lbServer.getParser(); } @@ -317,7 +392,7 @@ public class CloudSolrServer extends SolrServer { } } - DocCollection col = clusterState.getCollection(collection); + DocCollection col = getDocCollection(clusterState, collection); DocRouter router = col.getRouter(); @@ -534,7 +609,146 @@ public class CloudSolrServer extends SolrServer { } @Override - public NamedList request(SolrRequest request) + public NamedList request(SolrRequest request) throws SolrServerException, IOException { + SolrParams reqParams = request.getParams(); + String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection(); + return requestWithRetryOnStaleState(request, 0, collection); + } + + /** + * As this class doesn't watch external collections on the client side, + * there's a chance that the request will fail due to cached stale state, + * which means the state must be refreshed from ZK and retried. + */ + protected NamedList requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection) + throws SolrServerException, IOException { + + connect(); // important to call this before you start working with the ZkStateReader + + // build up a _stateVer_ param to pass to the server containing all of the + // external collection state versions involved in this request, which allows + // the server to notify us that our cached state for one or more of the external + // collections is stale and needs to be refreshed ... this code has no impact on internal collections + String stateVerParam = null; + List requestedCollections = null; + if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests + Set requestedCollectionNames = getCollectionList(getZkStateReader().getClusterState(), collection); + + StringBuilder stateVerParamBuilder = null; + for (String requestedCollection : requestedCollectionNames) { + // track the version of state we're using on the client side using the _stateVer_ param + DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection); + int collVer = coll.getZNodeVersion(); + if (coll.getStateFormat()>1) { + if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size()); + requestedCollections.add(coll); + + if (stateVerParamBuilder == null) { + stateVerParamBuilder = new StringBuilder(); + } else { + stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name + } + + stateVerParamBuilder.append(coll.getName()).append(":").append(collVer); + } + } + + if (stateVerParamBuilder != null) { + stateVerParam = stateVerParamBuilder.toString(); + } + } + + if (request.getParams() instanceof ModifiableSolrParams) { + ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); + if (stateVerParam != null) { + params.set(STATE_VERSION, stateVerParam); + } else { + params.remove(STATE_VERSION); + } + } // else: ??? how to set this ??? + + NamedList resp = null; + try { + resp = sendRequest(request); + } catch (Exception exc) { + + Throwable rootCause = SolrException.getRootCause(exc); + // don't do retry support for admin requests or if the request doesn't have a collection specified + if (collection == null || request.getPath().startsWith("/admin")) { + if (exc instanceof SolrServerException) { + throw (SolrServerException)exc; + } else if (exc instanceof IOException) { + throw (IOException)exc; + }else if (exc instanceof RuntimeException) { + throw (RuntimeException) exc; + } + else { + throw new SolrServerException(rootCause); + } + } + + int errorCode = (rootCause instanceof SolrException) ? + ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code; + + log.error("Request to collection {} failed due to ("+errorCode+ + ") {}, retry? "+retryCount, collection, rootCause.toString()); + + boolean wasCommError = + (rootCause instanceof ConnectException || + rootCause instanceof ConnectTimeoutException || + rootCause instanceof NoHttpResponseException || + rootCause instanceof SocketException); + + boolean stateWasStale = false; + if (retryCount < MAX_STALE_RETRIES && + !requestedCollections.isEmpty() && + SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE) + { + // cached state for one or more external collections was stale + // re-issue request using updated state + stateWasStale = true; + + // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence + for (DocCollection ext : requestedCollections) { + collectionStateCache.remove(ext.getName()); + } + } + + // if we experienced a communication error, it's worth checking the state + // with ZK just to make sure the node we're trying to hit is still part of the collection + if (retryCount < MAX_STALE_RETRIES && !stateWasStale && !requestedCollections.isEmpty() && wasCommError) { + for (DocCollection ext : requestedCollections) { + DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName()); + if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) { + // looks like we couldn't reach the server because the state was stale == retry + stateWasStale = true; + // we just pulled state from ZK, so update the cache so that the retry uses it + collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk)); + } + } + } + + requestedCollections.clear(); // done with this + + // if the state was stale, then we retry the request once with new state pulled from Zk + if (stateWasStale) { + log.warn("Re-trying request to collection(s) "+collection+" after stale state error from server."); + resp = requestWithRetryOnStaleState(request, retryCount+1, collection); + } else { + if (exc instanceof SolrServerException) { + throw (SolrServerException)exc; + } else if (exc instanceof IOException) { + throw (IOException)exc; + } else { + throw new SolrServerException(rootCause); + } + } + } + + return resp; + } + + protected NamedList sendRequest(SolrRequest request) throws SolrServerException, IOException { connect(); @@ -594,7 +808,7 @@ public class CloudSolrServer extends SolrServer { // add it to the Map of slices. Map slices = new HashMap<>(); for (String collectionName : collectionsList) { - DocCollection col = clusterState.getCollection(collectionName); + DocCollection col = getDocCollection(clusterState, collectionName); Collection routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col); ClientUtils.addSlices(slices, collectionName, routeSlices, true); } @@ -654,14 +868,16 @@ public class CloudSolrServer extends SolrServer { theUrlList = new ArrayList<>(urlList.size()); theUrlList.addAll(urlList); } + if(theUrlList.isEmpty()) { + throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Not enough nodes to handle the request"); + } + Collections.shuffle(theUrlList, rand); if (sendToLeaders) { ArrayList theReplicas = new ArrayList<>( replicasList.size()); theReplicas.addAll(replicasList); Collections.shuffle(theReplicas, rand); - // System.out.println("leaders:" + theUrlList); - // System.out.println("replicas:" + theReplicas); theUrlList.addAll(theReplicas); } @@ -690,10 +906,13 @@ public class CloudSolrServer extends SolrServer { collectionsList.addAll(aliasList); continue; } - - throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName); + + DocCollection docCollection = getDocCollection(clusterState, collection); + if (docCollection == null) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName); + } } - + collectionsList.add(collectionName); } return collectionsList; @@ -730,6 +949,19 @@ public class CloudSolrServer extends SolrServer { return updatesToLeaders; } + protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException { + ExpiringCachedDocCollection cachedState = collectionStateCache != null ? collectionStateCache.get(collection) : null; + if (cachedState != null && cachedState.cached != null) { + return cachedState.cached; + } + + DocCollection col = clusterState.getCollectionOrNull(collection); + if(col == null ) return null; + collectionStateCache.put(collection, new ExpiringCachedDocCollection(col)); + return col; + } + + /** * Useful for determining the minimum achieved replication factor across * all shards involved in processing an update request, typically useful diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java index 55cb71f6f13..e669afba9a4 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java @@ -26,11 +26,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.noggit.JSONWriter; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; +import org.noggit.JSONWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,9 +40,9 @@ import org.slf4j.LoggerFactory; public class ClusterState implements JSONWriter.Writable { private static Logger log = LoggerFactory.getLogger(ClusterState.class); - private Integer zkClusterStateVersion; + private Integer znodeVersion; - private final Map collectionStates; // Map> + private final Map collectionStates; private Set liveNodes; @@ -59,30 +57,42 @@ public class ClusterState implements JSONWriter.Writable { this(null, liveNodes, collectionStates); } - - /** * Use this constr when ClusterState is meant for consumption. */ - public ClusterState(Integer zkClusterStateVersion, Set liveNodes, + public ClusterState(Integer znodeVersion, Set liveNodes, Map collectionStates) { - this.zkClusterStateVersion = zkClusterStateVersion; - this.liveNodes = new HashSet<>(liveNodes.size()); - this.liveNodes.addAll(liveNodes); - this.collectionStates = new LinkedHashMap<>(collectionStates.size()); - this.collectionStates.putAll(collectionStates); - + this(liveNodes, getRefMap(collectionStates),znodeVersion); } + private static Map getRefMap(Map collectionStates) { + Map collRefs = new LinkedHashMap<>(collectionStates.size()); + for (Entry entry : collectionStates.entrySet()) { + final DocCollection c = entry.getValue(); + collRefs.put(entry.getKey(), new CollectionRef(c)); + } + return collRefs; + } + + /**Use this if all the collection states are not readily available and some needs to be lazily loaded + */ + public ClusterState(Set liveNodes, Map collectionStates, Integer znodeVersion){ + this.znodeVersion = znodeVersion; + this.liveNodes = new HashSet<>(liveNodes.size()); + this.liveNodes.addAll(liveNodes); + this.collectionStates = new LinkedHashMap<>(collectionStates); + } + + public ClusterState copyWith(Map modified){ - ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates); + ClusterState result = new ClusterState(liveNodes, new LinkedHashMap<>(collectionStates), znodeVersion); for (Entry e : modified.entrySet()) { DocCollection c = e.getValue(); if(c == null) { result.collectionStates.remove(e.getKey()); continue; } - result.collectionStates.put(c.getName(), c); + result.collectionStates.put(c.getName(), new CollectionRef(c)); } return result; } @@ -117,7 +127,7 @@ public class ClusterState implements JSONWriter.Writable { * coreNodeName is the same as replicaName */ public Replica getReplica(final String collection, final String coreNodeName) { - return getReplica(collectionStates.get(collection), coreNodeName); + return getReplica(getCollectionOrNull(collection), coreNodeName); } /** @@ -165,7 +175,8 @@ public class ClusterState implements JSONWriter.Writable { public DocCollection getCollectionOrNull(String coll) { - return collectionStates.get(coll); + CollectionRef ref = collectionStates.get(coll); + return ref == null? null:ref.get(); } /** @@ -175,12 +186,6 @@ public class ClusterState implements JSONWriter.Writable { return collectionStates.keySet(); } - /** - * @return Map<collectionName, Map<sliceName,Slice>> - */ - public Map getCollectionStates() { - return Collections.unmodifiableMap(collectionStates); - } /** * Get names of the currently live nodes. @@ -194,14 +199,14 @@ public class ClusterState implements JSONWriter.Writable { } public String getShardId(String collectionName, String nodeName, String coreName) { - Collection states = collectionStates.values(); + Collection states = collectionStates.values(); if (collectionName != null) { - DocCollection c = getCollectionOrNull(collectionName); - if (c != null) states = Collections.singletonList(c); + CollectionRef c = collectionStates.get(collectionName); + if (c != null) states = Collections.singletonList( c ); } - for (DocCollection coll : states) { - for (Slice slice : coll.getSlices()) { + for (CollectionRef coll : states) { + for (Slice slice : coll.get().getSlices()) { for (Replica replica : slice.getReplicas()) { // TODO: for really large clusters, we could 'index' on this String rnodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP); @@ -215,10 +220,10 @@ public class ClusterState implements JSONWriter.Writable { return null; } - public String getShardIdByCoreNodeName(String collectionName, String coreNodeName) { + /*public String getShardIdByCoreNodeName(String collectionName, String coreNodeName) { Collection states = collectionStates.values(); if (collectionName != null) { - DocCollection c = getCollectionOrNull(collectionName); + CollectionRef c = collectionStates.get(collectionName); if (c != null) states = Collections.singletonList(c); } @@ -232,7 +237,7 @@ public class ClusterState implements JSONWriter.Writable { } } return null; - } + }*/ /** * Check if node is alive. @@ -249,42 +254,31 @@ public class ClusterState implements JSONWriter.Writable { return sb.toString(); } - /** - * Create ClusterState by reading the current state from zookeeper. - */ - public static ClusterState load(SolrZkClient zkClient, Set liveNodes, ZkStateReader stateReader) throws KeeperException, InterruptedException { - Stat stat = new Stat(); - byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE, - null, stat, true); - return load(stat.getVersion(), state, liveNodes); + public static ClusterState load(Integer version, byte[] bytes, Set liveNodes) { + return load(version, bytes, liveNodes, ZkStateReader.CLUSTER_STATE); } - - /** * Create ClusterState from json string that is typically stored in zookeeper. * - * Use {@link ClusterState#load(SolrZkClient, Set, ZkStateReader)} instead, unless you want to - * do something more when getting the data - such as get the stat, set watch, etc. * @param version zk version of the clusterstate.json file (bytes) * @param bytes clusterstate.json as a byte array * @param liveNodes list of live nodes * @return the ClusterState */ - public static ClusterState load(Integer version, byte[] bytes, Set liveNodes) { + public static ClusterState load(Integer version, byte[] bytes, Set liveNodes, String znode) { // System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes))); if (bytes == null || bytes.length == 0) { return new ClusterState(version, liveNodes, Collections.emptyMap()); } Map stateMap = (Map) ZkStateReader.fromJSON(bytes); - Map collections = new LinkedHashMap<>(stateMap.size()); + Map collections = new LinkedHashMap<>(stateMap.size()); for (Entry entry : stateMap.entrySet()) { String collectionName = entry.getKey(); - DocCollection coll = collectionFromObjects(collectionName, (Map)entry.getValue(), version); - collections.put(collectionName, coll); + DocCollection coll = collectionFromObjects(collectionName, (Map)entry.getValue(), version, znode); + collections.put(collectionName, new CollectionRef(coll)); } - // System.out.println("######## ClusterState.load result:" + collections); - return new ClusterState( version, liveNodes, collections); + return new ClusterState( liveNodes, collections,version); } @@ -297,7 +291,7 @@ public class ClusterState implements JSONWriter.Writable { return new Aliases(aliasMap); } - private static DocCollection collectionFromObjects(String name, Map objs, Integer version) { + private static DocCollection collectionFromObjects(String name, Map objs, Integer version, String znode) { Map props; Map slices; @@ -324,7 +318,7 @@ public class ClusterState implements JSONWriter.Writable { router = DocRouter.getDocRouter((String) routerProps.get("name")); } - return new DocCollection(name, slices, props, router, version); + return new DocCollection(name, slices, props, router, version, znode); } private static Map makeSlices(Map genericSlices) { @@ -344,7 +338,28 @@ public class ClusterState implements JSONWriter.Writable { @Override public void write(JSONWriter jsonWriter) { - jsonWriter.write(collectionStates); + if (collectionStates.size() == 1) { + CollectionRef ref = collectionStates.values().iterator().next(); + DocCollection docCollection = ref.get(); + if (docCollection.getStateFormat() > 1) { + jsonWriter.write(Collections.singletonMap(docCollection.getName(), docCollection)); + // serializing a single DocCollection that is persisted outside of clusterstate.json + return; + } + } + + LinkedHashMap map = new LinkedHashMap<>(); + for (Entry e : collectionStates.entrySet()) { + // using this class check to avoid fetching from ZK in case of lazily loaded collection + if (e.getValue().getClass() == CollectionRef.class) { + // check if it is a lazily loaded collection outside of clusterstate.json + DocCollection coll = e.getValue().get(); + if (coll.getStateFormat() == 1) { + map.put(coll.getName(),coll); + } + } + } + jsonWriter.write(map); } /** @@ -353,7 +368,7 @@ public class ClusterState implements JSONWriter.Writable { * @return null if ClusterState was created for publication, not consumption */ public Integer getZkClusterStateVersion() { - return zkClusterStateVersion; + return znodeVersion; } @Override @@ -361,7 +376,7 @@ public class ClusterState implements JSONWriter.Writable { final int prime = 31; int result = 1; result = prime * result - + ((zkClusterStateVersion == null) ? 0 : zkClusterStateVersion.hashCode()); + + ((znodeVersion == null) ? 0 : znodeVersion.hashCode()); result = prime * result + ((liveNodes == null) ? 0 : liveNodes.hashCode()); return result; } @@ -372,9 +387,9 @@ public class ClusterState implements JSONWriter.Writable { if (obj == null) return false; if (getClass() != obj.getClass()) return false; ClusterState other = (ClusterState) obj; - if (zkClusterStateVersion == null) { - if (other.zkClusterStateVersion != null) return false; - } else if (!zkClusterStateVersion.equals(other.zkClusterStateVersion)) return false; + if (znodeVersion == null) { + if (other.znodeVersion != null) return false; + } else if (!znodeVersion.equals(other.znodeVersion)) return false; if (liveNodes == null) { if (other.liveNodes != null) return false; } else if (!liveNodes.equals(other.liveNodes)) return false; @@ -390,8 +405,22 @@ public class ClusterState implements JSONWriter.Writable { this.liveNodes = liveNodes; } - public DocCollection getCommonCollection(String name){ - return collectionStates.get(name); + /**For internal use only + */ + Map getCollectionStates() { + return collectionStates; + } + + public static class CollectionRef { + private final DocCollection coll; + + public CollectionRef(DocCollection coll) { + this.coll = coll; + } + + public DocCollection get(){ + return coll; + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java index 68f400e14c7..2c4b55c40ae 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java @@ -34,12 +34,14 @@ import org.noggit.JSONWriter; public class DocCollection extends ZkNodeProps { public static final String DOC_ROUTER = "router"; public static final String SHARDS = "shards"; - private int version; + public static final String STATE_FORMAT = "stateFormat"; + private int znodeVersion; private final String name; private final Map slices; private final Map activeSlices; private final DocRouter router; + private final String znode; private final Integer replicationFactor; private final Integer maxShardsPerNode; @@ -47,7 +49,7 @@ public class DocCollection extends ZkNodeProps { public DocCollection(String name, Map slices, Map props, DocRouter router) { - this(name, slices, props, router, -1); + this(name, slices, props, router, -1, ZkStateReader.CLUSTER_STATE); } /** @@ -55,9 +57,9 @@ public class DocCollection extends ZkNodeProps { * @param slices The logical shards of the collection. This is used directly and a copy is not made. * @param props The properties of the slice. This is used directly and a copy is not made. */ - public DocCollection(String name, Map slices, Map props, DocRouter router, int zkVersion) { - super( props==null ? props = new HashMap() : props); - this.version = zkVersion; + public DocCollection(String name, Map slices, Map props, DocRouter router, int zkVersion, String znode) { + super(props==null ? props = new HashMap() : props); + this.znodeVersion = zkVersion; this.name = name; this.slices = slices; @@ -89,10 +91,17 @@ public class DocCollection extends ZkNodeProps { this.activeSlices.put(slice.getKey(), slice.getValue()); } this.router = router; - + this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode; assert name != null && slices != null; } + /**Use this to make an exact copy of DocCollection with a new set of Slices and every other property as is + * @param slices the new set of Slices + * @return the resulting DocCollection + */ + public DocCollection copyWithSlices(Map slices){ + return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode); + } /** * Return collection name. @@ -134,10 +143,13 @@ public class DocCollection extends ZkNodeProps { return activeSlices; } - public int getVersion(){ - return version; + public int getZNodeVersion(){ + return znodeVersion; + } + + public int getStateFormat() { + return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1 : 2; } - /** * @return replication factor for this collection or null if no * replication factor exists. @@ -157,6 +169,10 @@ public class DocCollection extends ZkNodeProps { return maxShardsPerNode; } + public String getZNode(){ + return znode; + } + public DocRouter getRouter() { return router; @@ -174,4 +190,12 @@ public class DocCollection extends ZkNodeProps { all.put(SHARDS, slices); jsonWriter.write(all); } + + public Replica getReplica(String coreNodeName) { + for (Slice slice : slices.values()) { + Replica replica = slice.getReplica(coreNodeName); + if (replica != null) return replica; + } + return null; + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 9754fd89966..39bd708abe6 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -38,12 +38,14 @@ import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -100,9 +102,14 @@ public class ZkStateReader implements Closeable { public static final String LEADER_ELECT_ZKNODE = "/leader_elect"; public static final String SHARD_LEADERS_ZKNODE = "leaders"; + private final Set watchedCollections = new HashSet(); + + /**These are collections which are actively watched by this instance . + * + */ + private Map watchedCollectionStates = new ConcurrentHashMap(); - // // convenience methods... should these go somewhere else? // @@ -163,7 +170,8 @@ public class ZkStateReader implements Closeable { log.info("path={} {}={} specified config exists in ZooKeeper", new Object[] {path, CONFIGNAME_PROP, configName}); } - + } else { + throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path); } } catch (KeeperException e) { @@ -252,22 +260,26 @@ public class ZkStateReader implements Closeable { return aliases; } - /*public Boolean checkValid(String coll, int version){ + public Boolean checkValid(String coll, int version) { DocCollection collection = clusterState.getCollectionOrNull(coll); - if(collection ==null) return null; - if(collection.getVersion() < version){ - log.info("server older than client {}<{}",collection.getVersion(),version); - DocCollection nu = getExternCollectionFresh(this, coll); - if(nu.getVersion()> collection.getVersion()){ - updateExternCollection(nu); + if (collection == null) return null; + if (collection.getZNodeVersion() < version) { + log.debug("server older than client {}<{}", collection.getZNodeVersion(), version); + DocCollection nu = getCollectionLive(this, coll); + if (nu.getZNodeVersion() > collection.getZNodeVersion()) { + updateWatchedCollection(nu); collection = nu; } } - if(collection.getVersion() == version) return Boolean.TRUE; - log.info("wrong version from client {}!={} ",version, collection.getVersion()); + + if (collection.getZNodeVersion() == version) { + return Boolean.TRUE; + } + + log.debug("wrong version from client {}!={} ", version, collection.getZNodeVersion()); + return Boolean.FALSE; - - }*/ + } public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException, InterruptedException { @@ -296,18 +308,9 @@ public class ZkStateReader implements Closeable { synchronized (ZkStateReader.this.getUpdateLock()) { // remake watch final Watcher thisWatch = this; - Stat stat = new Stat(); - byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat , - true); Set ln = ZkStateReader.this.clusterState.getLiveNodes(); - ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln); // update volatile - ZkStateReader.this.clusterState = clusterState; - -// HashSet all = new HashSet<>(colls);; -// all.addAll(clusterState.getAllInternalCollections()); -// all.remove(null); - + ZkStateReader.this.clusterState = constructState(ln, thisWatch); } } catch (KeeperException e) { if (e.code() == KeeperException.Code.SESSIONEXPIRED @@ -376,8 +379,7 @@ public class ZkStateReader implements Closeable { Set liveNodeSet = new HashSet<>(); liveNodeSet.addAll(liveNodes); - ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this); - this.clusterState = clusterState; + this.clusterState = constructState(liveNodeSet, null); zkClient.exists(ALIASES, new Watcher() { @@ -423,8 +425,68 @@ public class ZkStateReader implements Closeable { }, true); } updateAliases(); + //on reconnect of SolrZkClient re-add watchers for the watched external collections + synchronized (this) { + for (String watchedCollection : watchedCollections) { + addZkWatch(watchedCollection); + } + } } + private ClusterState constructState(Set ln, Watcher watcher) + throws KeeperException, InterruptedException { + Stat stat = new Stat(); + byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true); + ClusterState loadedData = ClusterState.load(stat.getVersion(), data, ln, + CLUSTER_STATE); + Map result = new LinkedHashMap<>(); + result.putAll(loadedData.getCollectionStates());// first load all + // collections in + // clusterstate.json + for (String s : getIndividualColls()) { + DocCollection watched = watchedCollectionStates.get(s); + if (watched != null) { + // if it is a watched collection, add too + result.put(s, new ClusterState.CollectionRef(watched)); + } else { + // if it is not collection, then just create a reference which can fetch + // the collection object just in time from ZK + final String collName = s; + result.put(s, new ClusterState.CollectionRef(null) { + @Override + public DocCollection get() { + return getCollectionLive(ZkStateReader.this, collName); + } + }); + } + } + return new ClusterState(ln, result, stat.getVersion()); + } + + + private Set getIndividualColls() throws KeeperException, InterruptedException { + List children = null; + try { + children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true); + } catch (KeeperException.NoNodeException e) { + log.warn("Error fetching collection names"); + + return new HashSet<>(); + } + if (children == null || children.isEmpty()) return new HashSet<>(); + HashSet result = new HashSet<>(children.size()); + + for (String c : children) { + try { + if (zkClient.exists(getCollectionPath(c), true)) { + result.add(c); + } + } catch (Exception e) { + log.warn("Error reading collections nodes", e); + } + } + return result; + } // load and publish a new CollectionInfo private synchronized void updateClusterState(boolean immediate, @@ -443,7 +505,7 @@ public class ZkStateReader implements Closeable { if (!onlyLiveNodes) { log.debug("Updating cloud state from ZooKeeper... "); - clusterState = ClusterState.load(zkClient, liveNodesSet,this); + clusterState = constructState(liveNodesSet,null); } else { log.debug("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size()); clusterState = this.clusterState; @@ -451,6 +513,11 @@ public class ZkStateReader implements Closeable { } this.clusterState = clusterState; } + synchronized (ZkStateReader.this) { + for (String watchedCollection : watchedCollections) { + updateWatchedCollection(getCollectionLive(this, watchedCollection)); + } + } } else { if (clusterStateUpdateScheduled) { @@ -475,13 +542,12 @@ public class ZkStateReader implements Closeable { if (!onlyLiveNodes) { log.debug("Updating cloud state from ZooKeeper... "); - - clusterState = ClusterState.load(zkClient, liveNodesSet,ZkStateReader.this); + + clusterState = constructState(liveNodesSet,null); } else { log.debug("Updating live nodes from ZooKeeper... "); clusterState = ZkStateReader.this.clusterState; clusterState.setLiveNodes(liveNodesSet); - } ZkStateReader.this.clusterState = clusterState; @@ -504,13 +570,18 @@ public class ZkStateReader implements Closeable { } // update volatile ZkStateReader.this.clusterState = clusterState; + + synchronized (ZkStateReader.this) { + for (String watchedCollection : watchedCollections) { + updateWatchedCollection(getCollectionLive(ZkStateReader.this, watchedCollection)); + } + } } } }, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS); } - } - + /** * @return information about the cluster from ZooKeeper */ @@ -679,4 +750,131 @@ public class ZkStateReader implements Closeable { } } + public static DocCollection getCollectionLive(ZkStateReader zkStateReader, + String coll) { + String collectionPath = getCollectionPath(coll); + try { + if (!zkStateReader.getZkClient().exists(collectionPath, true)) return null; + Stat stat = new Stat(); + byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true); + ClusterState state = ClusterState.load(stat.getVersion(), data, + Collections. emptySet(), collectionPath); + ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll); + return collectionRef == null ? null : collectionRef.get(); + } catch (KeeperException.NoNodeException e) { + log.warn("No node available : " + collectionPath, e); + return null; + } catch (KeeperException e) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Could not load collection from ZK:" + coll, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SolrException(ErrorCode.BAD_REQUEST, + "Could not load collection from ZK:" + coll, e); + } + } + + public static String getCollectionPath(String coll) { + return COLLECTIONS_ZKNODE+"/"+coll + "/state.json"; + } + + public void addCollectionWatch(String coll) throws KeeperException, InterruptedException { + synchronized (this) { + if (watchedCollections.contains(coll)) return; + else { + watchedCollections.add(coll); + } + addZkWatch(coll); + } + } + + private void addZkWatch(final String coll) throws KeeperException, + InterruptedException { + log.info("addZkWatch {}", coll); + final String fullpath = getCollectionPath(coll); + synchronized (getUpdateLock()) { + + cmdExecutor.ensureExists(fullpath, zkClient); + log.info("Updating collection state at {} from ZooKeeper... ", fullpath); + + Watcher watcher = new Watcher() { + + @Override + public void process(WatchedEvent event) { + // session events are not change events, + // and do not remove the watcher + if (EventType.None.equals(event.getType())) { + return; + } + log.info("A cluster state change: {}, has occurred - updating... ", + (event), ZkStateReader.this.clusterState == null ? 0 + : ZkStateReader.this.clusterState.getLiveNodes().size()); + try { + + // delayed approach + // ZkStateReader.this.updateClusterState(false, false); + synchronized (ZkStateReader.this.getUpdateLock()) { + if (!watchedCollections.contains(coll)) { + log.info("Unwatched collection {}", coll); + return; + } + // remake watch + final Watcher thisWatch = this; + Stat stat = new Stat(); + byte[] data = zkClient.getData(fullpath, thisWatch, stat, true); + + if (data == null || data.length == 0) { + log.warn("No value set for collection state : {}", coll); + return; + + } + ClusterState clusterState = ClusterState.load(stat.getVersion(), + data, Collections. emptySet(), fullpath); + // update volatile + + DocCollection newState = clusterState.getCollectionStates() + .get(coll).get(); + updateWatchedCollection(newState); + + } + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.SESSIONEXPIRED + || e.code() == KeeperException.Code.CONNECTIONLOSS) { + log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); + return; + } + log.error("Unwatched collection :" + coll, e); + throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Unwatched collection :" + coll, e); + return; + } + } + + }; + zkClient.exists(fullpath, watcher, true); + } + updateWatchedCollection(getCollectionLive(this, coll)); + } + + private void updateWatchedCollection(DocCollection newState) { + watchedCollectionStates.put(newState.getName(), newState); + log.info("Updating data for {} to ver {} ", newState.getName(), + newState.getZNodeVersion()); + + this.clusterState = clusterState.copyWith(Collections.singletonMap( + newState.getName(), newState)); + } + + /** This is not a public API. Only used by ZkController */ + public void removeZKWatch(final String coll) { + synchronized (this) { + watchedCollections.remove(coll); + clusterState = clusterState.copyWith(Collections + . singletonMap(coll, null)); + } + } + } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java index 6b3c80491fb..1ba18adb2df 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java @@ -20,7 +20,6 @@ package org.apache.solr.client.solrj.impl; import java.io.File; import java.io.IOException; import java.net.MalformedURLException; -import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; @@ -42,7 +41,6 @@ import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.AbstractFullDistribZkTestBase; import org.apache.solr.cloud.AbstractZkTestCase; -import org.apache.solr.cloud.Overseer; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; @@ -51,7 +49,6 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; -import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; @@ -62,7 +59,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,6 +121,7 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase { @Override public void doTest() throws Exception { allTests(); + stateVersionParamTest(); } private void allTests() throws Exception { @@ -345,7 +342,77 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase { SolrInputDocument doc = getDoc(fields); indexDoc(doc); } - + + private void stateVersionParamTest() throws Exception { + CloudSolrServer client = createCloudClient(null); + try { + String collectionName = "checkStateVerCol"; + createCollection(collectionName, client, 2, 2); + waitForRecoveriesToFinish(collectionName, false); + DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName); + Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next(); + + HttpSolrServer httpSolrServer = new HttpSolrServer(r.getStr(ZkStateReader.BASE_URL_PROP) + "/"+collectionName); + + + SolrQuery q = new SolrQuery().setQuery("*:*"); + + log.info("should work query, result {}", httpSolrServer.query(q)); + //no problem + q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion()); + log.info("2nd query , result {}", httpSolrServer.query(q)); + //no error yet good + + q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+ (coll.getZNodeVersion() -1)); //an older version expect error + + HttpSolrServer.RemoteSolrException sse = null; + try { + httpSolrServer.query(q); + log.info("expected query error"); + } catch (HttpSolrServer.RemoteSolrException e) { + sse = e; + } + httpSolrServer.shutdown(); + assertNotNull(sse); + assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code); + + //now send the request to another node that does n ot serve the collection + + Set allNodesOfColl = new HashSet<>(); + for (Slice slice : coll.getSlices()) { + for (Replica replica : slice.getReplicas()) { + allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP)); + } + } + String theNode = null; + for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) { + String n = client.getZkStateReader().getBaseUrlForNodeName(s); + if(!allNodesOfColl.contains(s)){ + theNode = n; + break; + } + } + log.info("thenode which does not serve this collection{} ",theNode); + assertNotNull(theNode); + httpSolrServer = new HttpSolrServer(theNode + "/"+collectionName); + + q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion()); + + try { + httpSolrServer.query(q); + log.info("error was expected"); + } catch (HttpSolrServer.RemoteSolrException e) { + sse = e; + } + httpSolrServer.shutdown(); + assertNotNull(sse); + assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code); + } finally { + client.shutdown(); + } + + } + public void testShutdown() throws MalformedURLException { CloudSolrServer server = new CloudSolrServer("[ff01::114]:33332"); try { diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java index 270d07338c6..971841a33dd 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java @@ -61,6 +61,7 @@ import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; @@ -342,6 +343,19 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes return createJettys(numJettys, false); } + protected int defaultStateFormat = 1 + random().nextInt(2); + + protected int getStateFormat() { + String stateFormat = System.getProperty("tests.solr.stateFormat", null); + if (stateFormat != null) { + if ("2".equals(stateFormat)) { + return defaultStateFormat = 2; + } else if ("1".equals(stateFormat)) { + return defaultStateFormat = 1; + } + } + return defaultStateFormat; // random + } /** * @param checkCreatedVsState @@ -354,6 +368,18 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes List clients = new ArrayList<>(); StringBuilder sb = new StringBuilder(); + if (getStateFormat() == 2) { + log.info("Creating collection1 with stateFormat=2"); + SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), + AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT); + Overseer.getInQueue(zkClient).offer( + ZkStateReader.toJSON(ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, + OverseerCollectionProcessor.CREATECOLLECTION, "name", + DEFAULT_COLLECTION, "numShards", String.valueOf(sliceCount), + DocCollection.STATE_FORMAT, getStateFormat()))); + zkClient.close(); + } + for (int i = 1; i <= numJettys; i++) { if (sb.length() > 0) sb.append(','); int cnt = this.jettyIntCntr.incrementAndGet(); @@ -1501,6 +1527,10 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes collectionInfos.put(collectionName, list); } params.set("name", collectionName); + if (getStateFormat() == 2) { + log.info("Creating collection with stateFormat=2: " + collectionName); + params.set(DocCollection.STATE_FORMAT, "2"); + } SolrRequest request = new QueryRequest(params); request.setPath("/admin/collections");