From 2f28cc16e0e419e52e1892578b2bc2e941507790 Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Thu, 3 Jul 2014 11:13:35 +0000 Subject: [PATCH] reverting SOLR-5473 , SOLR-5474 git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1607587 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 4 - .../org/apache/solr/SolrLogFormatter.java | 2 +- .../java/org/apache/solr/cloud/Overseer.java | 133 +++------- .../cloud/OverseerCollectionProcessor.java | 16 +- .../org/apache/solr/cloud/ZkController.java | 14 - .../handler/admin/CollectionsHandler.java | 51 ++-- .../solr/servlet/SolrDispatchFilter.java | 18 -- .../solr/servlet/ZookeeperInfoServlet.java | 89 ++----- .../org/apache/solr/util/SolrLogLayout.java | 2 +- .../org/apache/solr/cloud/AssignTest.java | 2 +- .../apache/solr/cloud/ClusterStateTest.java | 15 +- .../solr/cloud/ExternalCollectionsTest.java | 119 --------- .../org/apache/solr/cloud/SliceStateTest.java | 4 +- .../client/solrj/impl/CloudSolrServer.java | 249 +----------------- .../solr/common/cloud/ClusterState.java | 111 ++++---- .../solr/common/cloud/DocCollection.java | 36 +-- .../solr/common/cloud/SolrZkClient.java | 1 - .../solr/common/cloud/ZkStateReader.java | 245 ++--------------- .../solrj/impl/CloudSolrServerTest.java | 77 +----- .../cloud/AbstractFullDistribZkTestBase.java | 28 -- 20 files changed, 183 insertions(+), 1033 deletions(-) delete mode 100644 solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 6267bf39bb2..0614b5ebcbb 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -70,10 +70,6 @@ New Features * SOLR-6103: Added DateRangeField for indexing date ranges, especially multi-valued ones. Based on LUCENE-5648. (David Smiley) -* SOLR-5473: Make one state.json per collection (noble, shalin, Timothy Potter ,Jessica Cheng, Anshum Gupta, Mark Miller) - -* SOLR-5474: Add stateFormat=2 support to CloudSolrServer (Timothy Potter , noble , Jessica Cheng) - Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/SolrLogFormatter.java b/solr/core/src/java/org/apache/solr/SolrLogFormatter.java index f3de9052555..ff1cf222c63 100644 --- a/solr/core/src/java/org/apache/solr/SolrLogFormatter.java +++ b/solr/core/src/java/org/apache/solr/SolrLogFormatter.java @@ -263,7 +263,7 @@ sb.append("(group_name=").append(tg.getName()).append(")"); private Map getReplicaProps(ZkController zkController, SolrCore core) { final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName(); - Replica replica = zkController.getClusterState().getReplica(collection, core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(),true); + Replica replica = zkController.getClusterState().getReplica(collection, core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName()); if(replica!=null) { return replica.getProperties(); } diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index c7a08c97264..f31b5e12ec9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -33,7 +33,6 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -105,8 +104,6 @@ public class Overseer { private Map clusterProps; private boolean isClosed = false; - private final Map updateNodes = new ConcurrentHashMap<>(); - private boolean isClusterStateModified = false; public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) { @@ -120,7 +117,6 @@ public class Overseer { this.myId = myId; this.reader = reader; clusterProps = reader.getClusterProps(); - reader.setEphemeralCollectionData(Collections.unmodifiableMap(updateNodes)); } public Stats getStateUpdateQueueStats() { @@ -261,7 +257,6 @@ public class Overseer { stateUpdateQueue.poll(); if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break; - if(!updateNodes.isEmpty()) break; // if an event comes in the next 100ms batch it together head = stateUpdateQueue.peek(100); } @@ -301,28 +296,8 @@ public class Overseer { TimerContext timerContext = stats.time("update_state"); boolean success = false; try { - if(!updateNodes.isEmpty()) { - for (Entry e : updateNodes.entrySet()) { - if (e.getValue() == null) { - if (zkClient.exists(e.getKey(), true)) zkClient.delete(e.getKey(), 0, true); - } else { - if (zkClient.exists(e.getKey(), true)) { - log.info("going to update_collection {}", e.getKey()); - zkClient.setData(e.getKey(), ZkStateReader.toJSON(e.getValue()), true); - } else { - log.info("going to create_collection {}", e.getValue()); - zkClient.create(e.getKey(), ZkStateReader.toJSON(e.getValue()), CreateMode.PERSISTENT, true); - } - } - } - updateNodes.clear(); - } - - if(isClusterStateModified) { - lastUpdatedTime = System.nanoTime(); - zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true); - isClusterStateModified = false; - } + zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true); + lastUpdatedTime = System.nanoTime(); success = true; } finally { timerContext.stop(); @@ -724,7 +699,7 @@ public class Overseer { } Slice slice = clusterState.getSlice(collection, sliceName); - + Map replicaProps = new LinkedHashMap<>(); replicaProps.putAll(message.getProperties()); @@ -742,7 +717,7 @@ public class Overseer { replicaProps.remove(ZkStateReader.SHARD_ID_PROP); replicaProps.remove(ZkStateReader.COLLECTION_PROP); replicaProps.remove(QUEUE_OPERATION); - + // remove any props with null values Set> entrySet = replicaProps.entrySet(); List removeKeys = new ArrayList<>(); @@ -890,18 +865,10 @@ public class Overseer { } collectionProps.put(DocCollection.DOC_ROUTER, routerSpec); - if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true"); - String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null : ZkStateReader.getCollectionPath(collectionName); - DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router , -1,znode); - isClusterStateModified = true; - log.info("state version {} {}", collectionName, newCollection.getStateFormat()); - if (newCollection.getStateFormat() > 1) { - updateNodes.put(ZkStateReader.getCollectionPath(collectionName), - new ClusterState(-1, Collections.emptySet(), singletonMap(newCollection.getName(), newCollection), state.getStateReader())); - return state; + if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true"); + DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router); + return newState(state, singletonMap(newCollection.getName(), newCollection)); } - return newState(state, singletonMap(newCollection.getName(), newCollection)); - } /* * Return an already assigned id or null if not assigned @@ -938,27 +905,30 @@ public class Overseer { } return null; } - + private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) { // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates())); // System.out.println("Updating slice:" + slice); - DocCollection newCollection = null; + DocCollection coll = state.getCollectionOrNull(collectionName) ; Map slices; + Map props; + DocRouter router; + if (coll == null) { // when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself // without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router. - slices = new LinkedHashMap<>(1); - slices.put(slice.getName(), slice); - Map props = new HashMap<>(1); + slices = new HashMap<>(1); + props = new HashMap<>(1); props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME)); - newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter()); + router = new ImplicitDocRouter(); } else { + props = coll.getProperties(); + router = coll.getRouter(); slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy - slices.put(slice.getName(), slice); - newCollection = coll.copyWith(slices); } - + slices.put(slice.getName(), slice); + DocCollection newCollection = new DocCollection(collectionName, slices, props, router); // System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections)); @@ -1017,54 +987,27 @@ public class Overseer { } - DocCollection newCollection = coll.copyWith(slices); + DocCollection newCollection = new DocCollection(coll.getName(), slices, coll.getProperties(), coll.getRouter()); return newState(state, singletonMap(collectionName, newCollection)); } - private ClusterState newState(ClusterState state, Map colls) { - for (Entry e : colls.entrySet()) { - DocCollection c = e.getValue(); - if (c == null) { - isClusterStateModified = true; - state = state.copyWith(singletonMap(e.getKey(), (DocCollection) null)); - continue; - } - - if (c.getStateFormat() >1) { - state.getStateReader().updateWatchedCollection(c); - updateNodes.put(ZkStateReader.getCollectionPath(c.getName()), new ClusterState(-1, Collections.emptySet(), singletonMap(c.getName(), c), state.getStateReader())); - } else { - isClusterStateModified = true; - state = state.copyWith(singletonMap(e.getKey(), c)); - } + private ClusterState newState(ClusterState state, Map colls) { + return state.copyWith(colls); } - return state; - } - /* - * Remove collection from cloudstate - */ - private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) { - final String collection = message.getStr("name"); - if (!checkKeyExistence(message, "name")) return clusterState; - DocCollection coll = clusterState.getCollectionOrNull(collection); - if(coll !=null) { - isClusterStateModified = true; - if(coll.getStateFormat()>1){ - try { - log.info("Deleting state for collection : {}", collection); - zkClient.delete(ZkStateReader.getCollectionPath(collection),-1,true); - } catch (Exception e) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"Unable to remove collection state :"+collection); - - } - return clusterState; - } else{ + /* + * Remove collection from cloudstate + */ + private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) { + final String collection = message.getStr("name"); + if (!checkKeyExistence(message, "name")) return clusterState; + DocCollection coll = clusterState.getCollectionOrNull(collection); + if(coll !=null) { return clusterState.copyWith(singletonMap(collection,(DocCollection)null)); } + return clusterState; } - return clusterState; - } + /* * Remove collection slice from cloudstate */ @@ -1080,7 +1023,7 @@ public class Overseer { Map newSlices = new LinkedHashMap<>(coll.getSlicesMap()); newSlices.remove(sliceId); - DocCollection newCollection = coll.copyWith(newSlices); + DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter()); return newState(clusterState, singletonMap(collection,newCollection)); } @@ -1092,6 +1035,8 @@ public class Overseer { final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); if (!checkCollectionKeyExistence(message)) return clusterState; +// final Map newCollections = new LinkedHashMap<>(clusterState.getCollectionStates()); // shallow copy +// DocCollection coll = newCollections.get(collection); DocCollection coll = clusterState.getCollectionOrNull(collection) ; if (coll == null) { // TODO: log/error that we didn't find it? @@ -1129,7 +1074,7 @@ public class Overseer { newSlices.put(slice.getName(), slice); } } - + if (lastSlice) { // remove all empty pre allocated slices for (Slice slice : coll.getSlices()) { @@ -1146,7 +1091,7 @@ public class Overseer { // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or // ZkController out of the Overseer. try { - zkClient.delete("/collections/" + collection,-1,true); + zkClient.clean("/collections/" + collection); } catch (InterruptedException e) { SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e); Thread.currentThread().interrupt(); @@ -1156,8 +1101,8 @@ public class Overseer { return newState(clusterState,singletonMap(collection, (DocCollection) null)); } else { - DocCollection newCollection = coll.copyWith(newSlices); - return newState(clusterState,singletonMap(collection,newCollection)); + DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter()); + return newState(clusterState,singletonMap(collection,newCollection)); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index e9c17fc7fee..3dee8a3c45c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -707,13 +707,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { Set collections = clusterState.getCollections(); for (String name : collections) { Map collectionStatus = null; - if (clusterState.getCollection(name).getStateFormat()>1) { - bytes = ZkStateReader.toJSON(clusterState.getCollection(name)); - Map docCollection = (Map) ZkStateReader.fromJSON(bytes); - collectionStatus = getCollectionStatus(docCollection, name, shard); - } else { - collectionStatus = getCollectionStatus((Map) stateMap.get(name), name, shard); - } + collectionStatus = getCollectionStatus((Map) stateMap.get(name), name, shard); if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) { collectionStatus.put("aliases", collectionVsAliases.get(name)); } @@ -722,12 +716,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { } else { String routeKey = message.getStr(ShardParams._ROUTE_); Map docCollection = null; - if (clusterState.getCollection(collection).getStateFormat()>1 ) { - bytes = ZkStateReader.toJSON(clusterState.getCollection(collection)); - docCollection = (Map) ZkStateReader.fromJSON(bytes); - } else { - docCollection = (Map) stateMap.get(collection); - } + + docCollection = (Map) stateMap.get(collection); if (routeKey == null) { Map collectionStatus = getCollectionStatus(docCollection, collection, shard); if (collectionVsAliases.containsKey(collection) && !collectionVsAliases.get(collection).isEmpty()) { diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 94082212159..9d165f29fce 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1109,16 +1109,6 @@ public final class ZkController { } CloudDescriptor cloudDescriptor = cd.getCloudDescriptor(); - boolean removeWatch = true; - for (SolrCore solrCore : cc.getCores()) {//if there is no SolrCoe which is a member of this collection, remove the watch - CloudDescriptor cloudDesc = solrCore.getCoreDescriptor().getCloudDescriptor(); - if (cloudDesc != null && cloudDescriptor.getCollectionName().equals(cloudDesc.getCollectionName())) { - //means - removeWatch = false; - break; - } - } - if(removeWatch) zkStateReader.removeZKWatch(collection); ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.DELETECORE, ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.NODE_NAME_PROP, getNodeName(), @@ -1421,10 +1411,6 @@ public final class ZkController { publish(cd, ZkStateReader.DOWN, false, true); DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName()); - if(collection !=null && collection.getStateFormat() >1 ){ - log.info("Registering watch for collection {}",cd.getCloudDescriptor().getCollectionName()); - zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName()); - } } catch (KeeperException e) { log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index ea4ef1ce354..380435bcfb5 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -143,7 +143,7 @@ public class CollectionsHandler extends RequestHandlerBase { if (action == null) { throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: " + a); } - + switch (action) { case CREATE: { this.handleCreateAction(req, rsp); @@ -320,36 +320,36 @@ public class CollectionsHandler extends RequestHandlerBase { SolrQueryResponse rsp) throws KeeperException, InterruptedException { handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT); } - + private void handleResponse(String operation, ZkNodeProps m, SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException { long time = System.nanoTime(); if(m.containsKey(ASYNC) && m.get(ASYNC) != null) { - + String asyncId = m.getStr(ASYNC); - + if(asyncId.equals("-1")) { throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes."); } - + NamedList r = new NamedList<>(); - + if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) || coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) || coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) || overseerCollectionQueueContains(asyncId)) { r.add("error", "Task with the same requestid already exists."); - + } else { coreContainer.getZkController().getOverseerCollectionQueue() .offer(ZkStateReader.toJSON(m)); } r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC)); SolrResponse response = new OverseerSolrResponse(r); - + rsp.getValues().addAll(response.getResponse()); - + return; } @@ -380,27 +380,27 @@ public class CollectionsHandler extends RequestHandlerBase { } } } - + private void handleReloadAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { log.info("Reloading Collection : " + req.getParamString()); String name = req.getParams().required().get("name"); - + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.RELOADCOLLECTION, "name", name); handleResponse(OverseerCollectionProcessor.RELOADCOLLECTION, m, rsp); } - + private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException { log.info("Syncing shard : " + req.getParamString()); String collection = req.getParams().required().get("collection"); String shard = req.getParams().required().get("shard"); - + ClusterState clusterState = coreContainer.getZkController().getClusterState(); - + ZkNodeProps leaderProps = clusterState.getLeader(collection, shard); ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps); - + HttpSolrServer server = new HttpSolrServer(nodeProps.getBaseUrl()); try { server.setConnectionTimeout(15000); @@ -414,36 +414,36 @@ public class CollectionsHandler extends RequestHandlerBase { server.shutdown(); } } - + private void handleCreateAliasAction(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { log.info("Create alias action : " + req.getParamString()); String name = req.getParams().required().get("name"); String collections = req.getParams().required().get("collections"); - + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.CREATEALIAS, "name", name, "collections", collections); - + handleResponse(OverseerCollectionProcessor.CREATEALIAS, m, rsp); } - + private void handleDeleteAliasAction(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { log.info("Delete alias action : " + req.getParamString()); String name = req.getParams().required().get("name"); - + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.DELETEALIAS, "name", name); - + handleResponse(OverseerCollectionProcessor.DELETEALIAS, m, rsp); } private void handleDeleteAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { log.info("Deleting Collection : " + req.getParamString()); - + String name = req.getParams().required().get("name"); - + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.DELETECOLLECTION, "name", name); @@ -464,7 +464,7 @@ public class CollectionsHandler extends RequestHandlerBase { throw new SolrException(ErrorCode.BAD_REQUEST, "Collection name is required to create a new collection"); } - + Map props = ZkNodeProps.makeMap( Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.CREATECOLLECTION, @@ -477,7 +477,6 @@ public class CollectionsHandler extends RequestHandlerBase { MAX_SHARDS_PER_NODE, CREATE_NODE_SET , SHARDS_PROP, - DocCollection.STATE_FORMAT, ASYNC, "router."); @@ -554,7 +553,7 @@ public class CollectionsHandler extends RequestHandlerBase { log.info("Deleting Shard : " + req.getParamString()); String name = req.getParams().required().get(ZkStateReader.COLLECTION_PROP); String shard = req.getParams().required().get(ZkStateReader.SHARD_ID_PROP); - + Map props = new HashMap<>(); props.put(ZkStateReader.COLLECTION_PROP, name); props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.DELETESHARD); diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index 70d8f526b53..13c890387e6 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -324,7 +324,6 @@ public class SolrDispatchFilter extends BaseSolrFilter { String coreUrl = getRemotCoreUrl(cores, corename, origCorename); // don't proxy for internal update requests SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString()); - checkStateIsValid(cores, queryParams.get(CloudSolrServer.STATE_VERSION)); if (coreUrl != null && queryParams .get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) { @@ -380,7 +379,6 @@ public class SolrDispatchFilter extends BaseSolrFilter { if( "/select".equals( path ) || "/select/".equals( path ) ) { solrReq = parser.parse( core, path, req ); - checkStateIsValid(cores,solrReq.getParams().get(CloudSolrServer.STATE_VERSION)); String qt = solrReq.getParams().get( CommonParams.QT ); handler = core.getRequestHandler( qt ); if( handler == null ) { @@ -470,22 +468,6 @@ public class SolrDispatchFilter extends BaseSolrFilter { chain.doFilter(request, response); } - private void checkStateIsValid(CoreContainer cores, String stateVer) { - if(stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware() ){ - // many have multiple collections separated by | - String[] pairs = StringUtils.split(stateVer, '|'); - for (String pair : pairs) { - String[] pcs = StringUtils.split(pair, ':'); - if(pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()){ - Boolean status = cores.getZkController().getZkStateReader().checkValid(pcs[0],Integer.parseInt(pcs[1])); - - if(Boolean.TRUE != status){ - throw new SolrException(ErrorCode.INVALID_STATE, "STATE STALE: " + pair+ "valid : "+status); - } - } - } - } - } private void processAliases(SolrQueryRequest solrReq, Aliases aliases, List collectionsList) { diff --git a/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java b/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java index e04a18d83a2..e5c2e540d95 100644 --- a/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java +++ b/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java @@ -17,26 +17,6 @@ package org.apache.solr.servlet; -import org.apache.lucene.util.BytesRef; -import org.apache.solr.cloud.ZkController; -import org.apache.solr.common.SolrException; -import org.apache.solr.common.cloud.SolrZkClient; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.params.SolrParams; -import org.apache.solr.core.CoreContainer; -import org.apache.solr.util.FastWriter; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import org.noggit.CharArr; -import org.noggit.JSONWriter; -import org.noggit.ObjectBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; @@ -49,6 +29,30 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.lucene.util.BytesRef; +import org.noggit.CharArr; +import org.noggit.JSONWriter; +import org.noggit.ObjectBuilder; +import org.apache.solr.cloud.ZkController; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.util.FastWriter; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.noggit.CharArr; +import org.noggit.JSONWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Zookeeper Info @@ -86,6 +90,7 @@ public final class ZookeeperInfoServlet extends HttpServlet { String path = params.get("path"); String addr = params.get("addr"); + boolean all = "true".equals(params.get("all")); if (addr != null && addr.length() == 0) { addr = null; @@ -105,7 +110,6 @@ public final class ZookeeperInfoServlet extends HttpServlet { ZKPrinter printer = new ZKPrinter(response, out, cores.getZkController(), addr); printer.detail = detail; printer.dump = dump; - printer.isTreeView = (params.get("wt") == null); // this is hacky but tree view requests don't come in with the wt set try { printer.print(path); @@ -136,8 +140,6 @@ public final class ZookeeperInfoServlet extends HttpServlet { boolean detail = false; boolean dump = false; - boolean isTreeView = false; - String addr; // the address passed to us String keeperAddr; // the address we're connected to @@ -383,47 +385,6 @@ public final class ZookeeperInfoServlet extends HttpServlet { dataStrErr = "data is not parsable as a utf8 String: " + e.toString(); } } - // pull in external collections too - if (ZkStateReader.CLUSTER_STATE.equals(path) && !isTreeView) { - SortedMap collectionStates = null; - List children = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, null, true); - java.util.Collections.sort(children); - for (String collection : children) { - String collStatePath = ZkStateReader.getCollectionPath(collection); - String childDataStr = null; - try { - byte[] childData = zkClient.getData(collStatePath, null, null, true); - if (childData != null) { - childDataStr = (new BytesRef(childData)).utf8ToString(); - } - } catch (KeeperException.NoNodeException nne) { - // safe to ignore - } catch (Exception childErr) { - log.error("Failed to get "+collStatePath+" due to: "+childErr); - } - - if (childDataStr != null) { - if (collectionStates == null) { - // initialize lazily as there may not be any external collections - collectionStates = new TreeMap<>(); - - // add the internal collections - if (dataStr != null) - collectionStates.putAll((Map)ObjectBuilder.fromJSON(dataStr)); - } - - // now add in the external collections - Map extColl = (Map)ObjectBuilder.fromJSON(childDataStr); - collectionStates.put(collection, extColl.get(collection)); - } - } - - if (collectionStates != null) { - CharArr out = new CharArr(); - new JSONWriter(out, 2).write(collectionStates); - dataStr = out.toString(); - } - } json.writeString("znode"); json.writeNameSeparator(); diff --git a/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java b/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java index 9fa3c7c7d4a..6e8f8aa33c9 100644 --- a/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java +++ b/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java @@ -234,7 +234,7 @@ public class SolrLogLayout extends Layout { private Map getReplicaProps(ZkController zkController, SolrCore core) { final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName(); - Replica replica = zkController.getZkStateReader().getClusterState(). getReplica(collection, zkController.getCoreNodeName(core.getCoreDescriptor()), true); + Replica replica = zkController.getClusterState().getReplica(collection, zkController.getCoreNodeName(core.getCoreDescriptor())); if(replica!=null) { return replica.getProperties(); } diff --git a/solr/core/src/test/org/apache/solr/cloud/AssignTest.java b/solr/core/src/test/org/apache/solr/cloud/AssignTest.java index a92099c0526..5d56e6c953a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/AssignTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/AssignTest.java @@ -86,7 +86,7 @@ public class AssignTest extends SolrTestCaseJ4 { collectionStates.put(cname, docCollection); Set liveNodes = new HashSet<>(); - ClusterState state = new ClusterState(-1,liveNodes, collectionStates, ClusterStateTest.getMockZkStateReader(collectionStates.keySet())); + ClusterState state = new ClusterState(-1,liveNodes, collectionStates); String nodeName = Assign.assignNode("collection1", state); assertEquals("core_node2", nodeName); diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java index 4db259e7d6f..8959140cd0e 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java @@ -62,10 +62,10 @@ public class ClusterStateTest extends SolrTestCaseJ4 { collectionStates.put("collection2", new DocCollection("collection2", slices, null, DocRouter.DEFAULT)); ZkStateReader zkStateReaderMock = getMockZkStateReader(collectionStates.keySet()); - ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates,zkStateReaderMock); + ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates); byte[] bytes = ZkStateReader.toJSON(clusterState); // System.out.println("#################### " + new String(bytes)); - ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes,zkStateReaderMock,null); + ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes); assertEquals("Provided liveNodes not used properly", 2, loadedClusterState .getLiveNodes().size()); @@ -73,13 +73,13 @@ public class ClusterStateTest extends SolrTestCaseJ4 { assertEquals("Poperties not copied properly", replica.getStr("prop1"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop1")); assertEquals("Poperties not copied properly", replica.getStr("prop2"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop2")); - loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes, getMockZkStateReader(Collections.emptySet()),null ); + loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes); assertEquals("Provided liveNodes not used properly", 2, loadedClusterState .getLiveNodes().size()); assertEquals("Should not have collections", 0, loadedClusterState.getCollections().size()); - loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes,getMockZkStateReader(Collections.emptySet()),null); + loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes); assertEquals("Provided liveNodes not used properly", 2, loadedClusterState .getLiveNodes().size()); @@ -89,13 +89,6 @@ public class ClusterStateTest extends SolrTestCaseJ4 { public static ZkStateReader getMockZkStateReader(final Set collections) { ZkStateReader mock = createMock(ZkStateReader.class); EasyMock.reset(mock); - mock.getAllCollections(); - EasyMock.expectLastCall().andAnswer(new IAnswer>() { - @Override - public Set answer() throws Throwable { - return collections; - } - }).anyTimes(); EasyMock.replay(mock); return mock; diff --git a/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java b/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java deleted file mode 100644 index 3456ff52853..00000000000 --- a/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java +++ /dev/null @@ -1,119 +0,0 @@ -package org.apache.solr.cloud; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.solr.client.solrj.impl.CloudSolrServer; -import org.apache.solr.client.solrj.request.QueryRequest; -import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.DocCollection; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.params.CollectionParams; -import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.zookeeper.data.Stat; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; - -public class ExternalCollectionsTest extends AbstractFullDistribZkTestBase { - private CloudSolrServer client; - - @BeforeClass - public static void beforeThisClass2() throws Exception { - - } - - @Before - @Override - public void setUp() throws Exception { - super.setUp(); - System.setProperty("numShards", Integer.toString(sliceCount)); - System.setProperty("solr.xml.persist", "true"); - client = createCloudClient(null); - } - - @After - public void tearDown() throws Exception { - super.tearDown(); - client.shutdown(); - } - - protected String getSolrXml() { - return "solr-no-core.xml"; - } - - public ExternalCollectionsTest() { - fixShardCount = true; - - sliceCount = 2; - shardCount = 4; - - checkCreatedVsState = false; - } - - - @Override - public void doTest() throws Exception { - testZkNodeLocation(); - } - - - boolean externalColl = false; - - @Override - protected int getStateFormat() { - return externalColl ? 2:1; - } - - private void testZkNodeLocation() throws Exception{ - externalColl=true; - - String collectionName = "myExternColl"; - - createCollection(collectionName, client, 2, 2); - - waitForRecoveriesToFinish(collectionName, false); - assertTrue("does not exist collection state externally", - cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true)); - Stat stat = new Stat(); - byte[] data = cloudClient.getZkStateReader().getZkClient().getData(ZkStateReader.getCollectionPath(collectionName), null, stat, true); - DocCollection c = ZkStateReader.getCollectionLive(cloudClient.getZkStateReader(), collectionName); - ClusterState clusterState = cloudClient.getZkStateReader().getClusterState(); - assertEquals("The zkversion of the nodes must be same zkver:" + stat.getVersion() , stat.getVersion(),clusterState.getCollection(collectionName).getZNodeVersion() ); - assertTrue("DocCllection#getStateFormat() must be > 1", cloudClient.getZkStateReader().getClusterState().getCollection(collectionName).getStateFormat() > 1); - - - // remove collection - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set("action", CollectionParams.CollectionAction.DELETE.toString()); - params.set("name", collectionName); - QueryRequest request = new QueryRequest(params); - request.setPath("/admin/collections"); - if (client == null) { - client = createCloudClient(null); - } - - client.request(request); - - checkForMissingCollection(collectionName); - assertFalse("collection state should not exist externally", cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true)); - - } -} - - - diff --git a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java index 49f43de21a1..461ef446225 100644 --- a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java @@ -50,9 +50,9 @@ public class SliceStateTest extends SolrTestCaseJ4 { collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT)); ZkStateReader mockZkStateReader = ClusterStateTest.getMockZkStateReader(collectionStates.keySet()); - ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates, mockZkStateReader); + ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates); byte[] bytes = ZkStateReader.toJSON(clusterState); - ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes, mockZkStateReader,null); + ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes); assertEquals("Default state not set to active", "active", loadedClusterState.getSlice("collection1", "shard1").getState()); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java index 51fa73fd2a5..019bee063ea 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java @@ -18,9 +18,7 @@ package org.apache.solr.client.solrj.impl; */ import java.io.IOException; -import java.net.ConnectException; import java.net.MalformedURLException; -import java.net.SocketException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -32,16 +30,13 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; -import org.apache.http.NoHttpResponseException; import org.apache.http.client.HttpClient; -import org.apache.http.conn.ConnectTimeoutException; import org.apache.solr.client.solrj.ResponseParser; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServer; @@ -72,8 +67,6 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.apache.solr.common.util.StrUtils; import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * SolrJ client class to communicate with SolrCloud. @@ -86,8 +79,6 @@ import org.slf4j.LoggerFactory; * with {@link #setIdField(String)}. */ public class CloudSolrServer extends SolrServer { - private static final Logger log = LoggerFactory.getLogger(CloudSolrServer.class); - private volatile ZkStateReader zkStateReader; private String zkHost; // the zk server address private int zkConnectTimeout = 10000; @@ -96,8 +87,6 @@ public class CloudSolrServer extends SolrServer { private final LBHttpSolrServer lbServer; private final boolean shutdownLBHttpSolrServer; private HttpClient myClient; - //no of times collection state to be reloaded if stale state error is received - private static final int MAX_STALE_RETRIES = 5; Random rand = new Random(); private final boolean updatesToLeaders; @@ -106,7 +95,6 @@ public class CloudSolrServer extends SolrServer { .newCachedThreadPool(new SolrjNamedThreadFactory( "CloudSolrServer ThreadPool")); private String idField = "id"; - public static final String STATE_VERSION = "_stateVer_"; private final Set NON_ROUTABLE_PARAMS; { NON_ROUTABLE_PARAMS = new HashSet<>(); @@ -124,36 +112,8 @@ public class CloudSolrServer extends SolrServer { // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK); } - private volatile long timeToLive = 60* 1000L; - protected Map collectionStateCache = new ConcurrentHashMap(){ - @Override - public ExpiringCachedDocCollection get(Object key) { - ExpiringCachedDocCollection val = super.get(key); - if(val == null) return null; - if(val.isExpired(timeToLive)) { - super.remove(key); - return null; - } - return val; - } - - }; - - class ExpiringCachedDocCollection { - DocCollection cached; - long cachedAt; - - ExpiringCachedDocCollection(DocCollection cached) { - this.cached = cached; - this.cachedAt = System.currentTimeMillis(); - } - - boolean isExpired(long timeToLive) { - return (System.currentTimeMillis() - cachedAt) > timeToLive; - } - } /** * @param zkHost The client endpoint of the zookeeper quorum containing the cloud state, @@ -167,8 +127,6 @@ public class CloudSolrServer extends SolrServer { this.lbServer.setParser(new BinaryResponseParser()); this.updatesToLeaders = true; shutdownLBHttpSolrServer = true; - setupStateVerParamOnQueryString(lbServer); - } public CloudSolrServer(String zkHost, boolean updatesToLeaders) @@ -180,15 +138,6 @@ public class CloudSolrServer extends SolrServer { this.lbServer.setParser(new BinaryResponseParser()); this.updatesToLeaders = updatesToLeaders; shutdownLBHttpSolrServer = true; - setupStateVerParamOnQueryString(lbServer); - } - - /**Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json - * @param seconds ttl value in seconds - */ - public void setCollectionCacheTTl(int seconds){ - assert seconds > 0; - timeToLive = seconds*1000L; } /** @@ -201,7 +150,6 @@ public class CloudSolrServer extends SolrServer { this.lbServer = lbServer; this.updatesToLeaders = true; shutdownLBHttpSolrServer = false; - setupStateVerParamOnQueryString(lbServer); } /** @@ -215,24 +163,8 @@ public class CloudSolrServer extends SolrServer { this.lbServer = lbServer; this.updatesToLeaders = updatesToLeaders; shutdownLBHttpSolrServer = false; - setupStateVerParamOnQueryString(lbServer); - } - /** - * Used internally to setup the _stateVer_ param to be sent in the query string of requests - * coming from this instance. - */ - protected void setupStateVerParamOnQueryString(LBHttpSolrServer lbServer) { - // setup the stateVer param to be passed in the query string of every request - Set queryStringParams = lbServer.getQueryParams(); - if (queryStringParams == null) { - queryStringParams = new HashSet(2); - lbServer.setQueryParams(queryStringParams); - } - queryStringParams.add(STATE_VERSION); - } - public ResponseParser getParser() { return lbServer.getParser(); } @@ -306,7 +238,8 @@ public class CloudSolrServer extends SolrServer { if (zkStateReader == null) { ZkStateReader zk = null; try { - zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout); + zk = new ZkStateReader(zkHost, zkClientTimeout, + zkConnectTimeout); zk.createClusterStateWatchersAndUpdate(); zkStateReader = zk; } catch (InterruptedException e) { @@ -369,7 +302,7 @@ public class CloudSolrServer extends SolrServer { } } - DocCollection col = getDocCollection(clusterState, collection); + DocCollection col = clusterState.getCollection(collection); DocRouter router = col.getRouter(); @@ -586,146 +519,7 @@ public class CloudSolrServer extends SolrServer { } @Override - public NamedList request(SolrRequest request) throws SolrServerException, IOException { - SolrParams reqParams = request.getParams(); - String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection(); - return requestWithRetryOnStaleState(request, 0, collection); - } - - /** - * As this class doesn't watch external collections on the client side, - * there's a chance that the request will fail due to cached stale state, - * which means the state must be refreshed from ZK and retried. - */ - protected NamedList requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection) - throws SolrServerException, IOException { - - connect(); // important to call this before you start working with the ZkStateReader - - // build up a _stateVer_ param to pass to the server containing all of the - // external collection state versions involved in this request, which allows - // the server to notify us that our cached state for one or more of the external - // collections is stale and needs to be refreshed ... this code has no impact on internal collections - String stateVerParam = null; - List requestedCollections = null; - if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests - Set requestedCollectionNames = getCollectionList(getZkStateReader().getClusterState(), collection); - - StringBuilder stateVerParamBuilder = null; - for (String requestedCollection : requestedCollectionNames) { - // track the version of state we're using on the client side using the _stateVer_ param - DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection); - int collVer = coll.getZNodeVersion(); - if (coll.getStateFormat()>1) { - if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size()); - requestedCollections.add(coll); - - if (stateVerParamBuilder == null) { - stateVerParamBuilder = new StringBuilder(); - } else { - stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name - } - - stateVerParamBuilder.append(coll.getName()).append(":").append(collVer); - } - } - - if (stateVerParamBuilder != null) { - stateVerParam = stateVerParamBuilder.toString(); - } - } - - if (request.getParams() instanceof ModifiableSolrParams) { - ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); - if (stateVerParam != null) { - params.set(STATE_VERSION, stateVerParam); - } else { - params.remove(STATE_VERSION); - } - } // else: ??? how to set this ??? - - NamedList resp = null; - try { - resp = sendRequest(request); - } catch (Exception exc) { - - Throwable rootCause = SolrException.getRootCause(exc); - // don't do retry support for admin requests or if the request doesn't have a collection specified - if (collection == null || request.getPath().startsWith("/admin")) { - if (exc instanceof SolrServerException) { - throw (SolrServerException)exc; - } else if (exc instanceof IOException) { - throw (IOException)exc; - }else if (exc instanceof RuntimeException) { - throw (RuntimeException) exc; - } - else { - throw new SolrServerException(rootCause); - } - } - - int errorCode = (rootCause instanceof SolrException) ? - ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code; - - log.error("Request to collection {} failed due to ("+errorCode+ - ") {}, retry? "+retryCount, collection, rootCause.toString()); - - boolean wasCommError = - (rootCause instanceof ConnectException || - rootCause instanceof ConnectTimeoutException || - rootCause instanceof NoHttpResponseException || - rootCause instanceof SocketException); - - boolean stateWasStale = false; - if (retryCount < MAX_STALE_RETRIES && - !requestedCollections.isEmpty() && - SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE) - { - // cached state for one or more external collections was stale - // re-issue request using updated state - stateWasStale = true; - - // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence - for (DocCollection ext : requestedCollections) { - collectionStateCache.remove(ext.getName()); - } - } - - // if we experienced a communication error, it's worth checking the state - // with ZK just to make sure the node we're trying to hit is still part of the collection - if (retryCount < MAX_STALE_RETRIES && !stateWasStale && !requestedCollections.isEmpty() && wasCommError) { - for (DocCollection ext : requestedCollections) { - DocCollection latestStateFromZk = getZkStateReader().getCollection(ext.getName()); - if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) { - // looks like we couldn't reach the server because the state was stale == retry - stateWasStale = true; - // we just pulled state from ZK, so update the cache so that the retry uses it - collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk)); - } - } - } - - requestedCollections.clear(); // done with this - - // if the state was stale, then we retry the request once with new state pulled from Zk - if (stateWasStale) { - log.warn("Re-trying request to collection(s) "+collection+" after stale state error from server."); - resp = requestWithRetryOnStaleState(request, retryCount+1, collection); - } else { - if (exc instanceof SolrServerException) { - throw (SolrServerException)exc; - } else if (exc instanceof IOException) { - throw (IOException)exc; - } else { - throw new SolrServerException(rootCause); - } - } - } - - return resp; - } - - protected NamedList sendRequest(SolrRequest request) + public NamedList request(SolrRequest request) throws SolrServerException, IOException { connect(); @@ -785,7 +579,7 @@ public class CloudSolrServer extends SolrServer { // add it to the Map of slices. Map slices = new HashMap<>(); for (String collectionName : collectionsList) { - DocCollection col = getDocCollection(clusterState, collectionName); + DocCollection col = clusterState.getCollection(collectionName); Collection routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col); ClientUtils.addSlices(slices, collectionName, routeSlices, true); } @@ -877,17 +671,14 @@ public class CloudSolrServer extends SolrServer { Aliases aliases = zkStateReader.getAliases(); String alias = aliases.getCollectionAlias(collectionName); if (alias != null) { - List aliasList = StrUtils.splitSmart(alias, ",", true); + List aliasList = StrUtils.splitSmart(alias, ",", true); collectionsList.addAll(aliasList); continue; } - - DocCollection docCollection = getDocCollection(clusterState, collection); - if (docCollection == null) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName); - } + + throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName); } - + collectionsList.add(collectionName); } return collectionsList; @@ -924,28 +715,6 @@ public class CloudSolrServer extends SolrServer { return updatesToLeaders; } - protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException { - ExpiringCachedDocCollection cachedState = collectionStateCache != null ? collectionStateCache.get(collection) : null; - if (cachedState != null && cachedState.cached != null) { - return cachedState.cached; - } - - DocCollection col = clusterState.getCollectionOrNull(collection); - if(col == null ) return null; - collectionStateCache.put(collection, new ExpiringCachedDocCollection(col)); - return col; - } - - /** - * Extension point to allow sub-classes to override the ZkStateReader this class uses internally. - */ - protected ZkStateReader createZkStateReader(String zkHost, int zkClientTimeout, int zkConnectTimeout) - throws InterruptedException, TimeoutException, IOException, KeeperException { - ZkStateReader zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout); - zk.createClusterStateWatchersAndUpdate(); - return zk; - } - /** * Useful for determining the minimum achieved replication factor across * all shards involved in processing an update request, typically useful diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java index d8fd1fcc65b..e4a846520ab 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java @@ -46,25 +46,36 @@ public class ClusterState implements JSONWriter.Writable { private final Map collectionStates; // Map> private Set liveNodes; - private final ZkStateReader stateReader; + + /** + * Use this constr when ClusterState is meant for publication. + * + * hashCode and equals will only depend on liveNodes and not clusterStateVersion. + */ + @Deprecated + public ClusterState(Set liveNodes, + Map collectionStates) { + this(null, liveNodes, collectionStates); + } + + + /** * Use this constr when ClusterState is meant for consumption. */ public ClusterState(Integer zkClusterStateVersion, Set liveNodes, - Map collectionStates, ZkStateReader stateReader) { - assert stateReader != null; + Map collectionStates) { this.zkClusterStateVersion = zkClusterStateVersion; this.liveNodes = new HashSet<>(liveNodes.size()); this.liveNodes.addAll(liveNodes); this.collectionStates = new LinkedHashMap<>(collectionStates.size()); this.collectionStates.putAll(collectionStates); - this.stateReader = stateReader; } public ClusterState copyWith(Map modified){ - ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates,stateReader); + ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates); for (Entry e : modified.entrySet()) { DocCollection c = e.getValue(); if(c == null) { @@ -87,10 +98,17 @@ public class ClusterState implements JSONWriter.Writable { if (slice == null) return null; return slice.getLeader(); } + private Replica getReplica(DocCollection coll, String replicaName) { + if (coll == null) return null; + for (Slice slice : coll.getSlices()) { + Replica replica = slice.getReplica(replicaName); + if (replica != null) return replica; + } + return null; + } public boolean hasCollection(String coll) { - if (collectionStates.containsKey(coll)) return true; - return stateReader.getAllCollections().contains(coll); + return collectionStates.containsKey(coll) ; } /** @@ -98,9 +116,8 @@ public class ClusterState implements JSONWriter.Writable { * If the slice is known, do not use this method. * coreNodeName is the same as replicaName */ - public Replica getReplica(final String collection, final String coreNodeName, boolean cachedOnly) { - DocCollection coll = stateReader.getCollection(collection,cachedOnly); - return coll == null? null: coll.getReplica(coreNodeName); + public Replica getReplica(final String collection, final String coreNodeName) { + return getReplica(collectionStates.get(collection), coreNodeName); } /** @@ -136,35 +153,6 @@ public class ClusterState implements JSONWriter.Writable { return coll.getActiveSlices(); } - /** - * Get the {@code DocCollection} object if available. This method will - * never hit ZooKeeper and attempt to fetch collection from locally available - * state only. - * - * @param collection the name of the collection - * @return the {@link org.apache.solr.common.cloud.DocCollection} or null if not found - */ - private DocCollection getCachedCollection(String collection) { - DocCollection c = collectionStates.get(collection); - if (c != null) return c; - if (!stateReader.getAllCollections().contains(collection)) return null; - return stateReader.getCollection(collection, true); // return from cache - } - - /** expert internal use only - * Gets the replica from caches by the core name (assuming the slice is unknown) or null if replica is not found. - * If the slice is known, do not use this method. - * coreNodeName is the same as replicaName - */ - public Replica getCachedReplica(String collectionName, String coreNodeName) { - DocCollection c = getCachedCollection(collectionName); - if (c == null) return null; - for (Slice slice : c.getSlices()) { - Replica replica = slice.getReplica(coreNodeName); - if (replica != null) return replica; - } - return null; - } /** * Get the named DocCollection object, or throw an exception if it doesn't exist. @@ -175,26 +163,23 @@ public class ClusterState implements JSONWriter.Writable { return coll; } + public DocCollection getCollectionOrNull(String coll) { - DocCollection c = collectionStates.get(coll); - if (c != null) return c; - if (!stateReader.getAllCollections().contains(coll)) return null; - return stateReader.getCollection(coll); + return collectionStates.get(coll); } /** * Get collection names. */ public Set getCollections() { - return stateReader.getAllCollections(); + return collectionStates.keySet(); } /** - * For internal use only * @return Map<collectionName, Map<sliceName,Slice>> */ - Map getCollectionStates() { - return collectionStates; + public Map getCollectionStates() { + return Collections.unmodifiableMap(collectionStates); } /** @@ -253,7 +238,7 @@ public class ClusterState implements JSONWriter.Writable { Stat stat = new Stat(); byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE, null, stat, true); - return load(stat.getVersion(), state, liveNodes, stateReader, ZkStateReader.CLUSTER_STATE); + return load(stat.getVersion(), state, liveNodes); } @@ -265,34 +250,25 @@ public class ClusterState implements JSONWriter.Writable { * @param version zk version of the clusterstate.json file (bytes) * @param bytes clusterstate.json as a byte array * @param liveNodes list of live nodes - * @param stateReader The ZkStateReader for this clusterstate - * @param znode the znode from which this data is read from * @return the ClusterState */ - public static ClusterState load(Integer version, byte[] bytes, Set liveNodes, ZkStateReader stateReader, String znode) { + public static ClusterState load(Integer version, byte[] bytes, Set liveNodes) { // System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes))); if (bytes == null || bytes.length == 0) { - return new ClusterState(version, liveNodes, Collections.emptyMap(),stateReader); + return new ClusterState(version, liveNodes, Collections.emptyMap()); } Map stateMap = (Map) ZkStateReader.fromJSON(bytes); Map collections = new LinkedHashMap<>(stateMap.size()); for (Entry entry : stateMap.entrySet()) { String collectionName = entry.getKey(); - DocCollection coll = collectionFromObjects(collectionName, (Map)entry.getValue(), version, znode); + DocCollection coll = collectionFromObjects(collectionName, (Map)entry.getValue(), version); collections.put(collectionName, coll); } // System.out.println("######## ClusterState.load result:" + collections); - return new ClusterState( version, liveNodes, collections,stateReader); + return new ClusterState( version, liveNodes, collections); } - /** - * @deprecated use {@link #load(Integer, byte[], Set, ZkStateReader, String)} - */ - @Deprecated - public static ClusterState load(Integer version, byte[] bytes, Set liveNodes){ - return load(version == null ? -1: version, bytes, liveNodes,null,null); - } public static Aliases load(byte[] bytes) { if (bytes == null || bytes.length == 0) { @@ -303,7 +279,7 @@ public class ClusterState implements JSONWriter.Writable { return new Aliases(aliasMap); } - private static DocCollection collectionFromObjects(String name, Map objs, Integer version, String znode) { + private static DocCollection collectionFromObjects(String name, Map objs, Integer version) { Map props; Map slices; @@ -330,7 +306,7 @@ public class ClusterState implements JSONWriter.Writable { router = DocRouter.getDocRouter((String) routerProps.get("name")); } - return new DocCollection(name, slices, props, router, version,znode); + return new DocCollection(name, slices, props, router, version); } private static Map makeSlices(Map genericSlices) { @@ -358,7 +334,7 @@ public class ClusterState implements JSONWriter.Writable { * * @return null if ClusterState was created for publication, not consumption */ - public Integer getZnodeVersion() { + public Integer getZkClusterStateVersion() { return zkClusterStateVersion; } @@ -388,9 +364,6 @@ public class ClusterState implements JSONWriter.Writable { } - public ZkStateReader getStateReader(){ - return stateReader; - } /** * Internal API used only by ZkStateReader @@ -399,5 +372,9 @@ public class ClusterState implements JSONWriter.Writable { this.liveNodes = liveNodes; } + public DocCollection getCommonCollection(String name){ + return collectionStates.get(name); + + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java index c96bdfedd73..a02dfd5e174 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java @@ -21,6 +21,7 @@ import org.noggit.JSONUtil; import org.noggit.JSONWriter; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -32,17 +33,15 @@ import java.util.Map; public class DocCollection extends ZkNodeProps { public static final String DOC_ROUTER = "router"; public static final String SHARDS = "shards"; - public static final String STATE_FORMAT = "stateFormat"; - private int znodeVersion; + private int version; private final String name; private final Map slices; private final Map activeSlices; private final DocRouter router; - private final String znode; public DocCollection(String name, Map slices, Map props, DocRouter router) { - this(name, slices, props, router, -1, ZkStateReader.CLUSTER_STATE); + this(name, slices, props, router, -1); } /** @@ -50,9 +49,9 @@ public class DocCollection extends ZkNodeProps { * @param slices The logical shards of the collection. This is used directly and a copy is not made. * @param props The properties of the slice. This is used directly and a copy is not made. */ - public DocCollection(String name, Map slices, Map props, DocRouter router, int zkVersion, String znode) { + public DocCollection(String name, Map slices, Map props, DocRouter router, int zkVersion) { super( props==null ? props = new HashMap() : props); - this.znodeVersion = zkVersion; + this.version = zkVersion; this.name = name; this.slices = slices; @@ -66,12 +65,8 @@ public class DocCollection extends ZkNodeProps { this.activeSlices.put(slice.getKey(), slice.getValue()); } this.router = router; - this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode; - assert name != null && slices != null; - } - public DocCollection copyWith(Map slices){ - return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode); + assert name != null && slices != null; } @@ -115,16 +110,9 @@ public class DocCollection extends ZkNodeProps { return activeSlices; } - public int getZNodeVersion(){ - return znodeVersion; - } + public int getVersion(){ + return version; - public int getStateFormat(){ - return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1:2; - } - - public String getZNode(){ - return znode; } @@ -144,12 +132,4 @@ public class DocCollection extends ZkNodeProps { all.put(SHARDS, slices); jsonWriter.write(all); } - - public Replica getReplica(String coreNodeName) { - for (Slice slice : slices.values()) { - Replica replica = slice.getReplica(coreNodeName); - if (replica != null) return replica; - } - return null; - } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java index 518d345f9ed..d8ea26859ed 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -564,7 +564,6 @@ public class SolrZkClient { } public void close() { -// log.warn("closed inst :"+inst, new Exception("leakdebug")); if (isClosed) return; // it's okay if we over close - same as solrcore isClosed = true; try { diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 503daa22690..a7b784c3bd4 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -98,16 +98,10 @@ public class ZkStateReader { public static final String LEADER_ELECT_ZKNODE = "/leader_elect"; public static final String SHARD_LEADERS_ZKNODE = "leaders"; - private final Set watchedCollections = new HashSet(); - /**These are collections which are actively watched by this instance . - * - */ - private Map watchedCollectionStates = new ConcurrentHashMap(); - private Set allCollections = Collections.emptySet(); - + // // convenience methods... should these go somewhere else? // @@ -168,8 +162,7 @@ public class ZkStateReader { log.info("path={} {}={} specified config exists in ZooKeeper", new Object[] {path, CONFIGNAME_PROP, configName}); } - } else { - throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path); + } } catch (KeeperException e) { @@ -258,21 +251,22 @@ public class ZkStateReader { return aliases; } - public Boolean checkValid(String coll, int version){ + /*public Boolean checkValid(String coll, int version){ DocCollection collection = clusterState.getCollectionOrNull(coll); if(collection ==null) return null; - if(collection.getZNodeVersion() < version){ - log.info("server older than client {}<{}",collection.getZNodeVersion(),version); - DocCollection nu = getCollectionLive(this, coll); - if(nu.getZNodeVersion()> collection.getZNodeVersion()){ - updateWatchedCollection(nu); + if(collection.getVersion() < version){ + log.info("server older than client {}<{}",collection.getVersion(),version); + DocCollection nu = getExternCollectionFresh(this, coll); + if(nu.getVersion()> collection.getVersion()){ + updateExternCollection(nu); collection = nu; } } - if(collection.getZNodeVersion() == version) return Boolean.TRUE; - log.debug("wrong version from client {}!={} ",version, collection.getZNodeVersion()); + if(collection.getVersion() == version) return Boolean.TRUE; + log.info("wrong version from client {}!={} ",version, collection.getVersion()); return Boolean.FALSE; - } + + }*/ public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException, InterruptedException { @@ -305,11 +299,10 @@ public class ZkStateReader { byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat , true); Set ln = ZkStateReader.this.clusterState.getLiveNodes(); - ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln,ZkStateReader.this, null); + ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln); // update volatile ZkStateReader.this.clusterState = clusterState; - updateCollectionNames(); // HashSet all = new HashSet<>(colls);; // all.addAll(clusterState.getAllInternalCollections()); // all.remove(null); @@ -384,7 +377,6 @@ public class ZkStateReader { liveNodeSet.addAll(liveNodes); ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this); this.clusterState = clusterState; - updateCollectionNames(); zkClient.exists(ALIASES, new Watcher() { @@ -430,40 +422,6 @@ public class ZkStateReader { }, true); } updateAliases(); - //on reconnect of SolrZkClient re-add watchers for the watched external collections - synchronized (this) { - for (String watchedCollection : watchedCollections) { - addZkWatch(watchedCollection); - } - } - } - - public void updateCollectionNames() throws KeeperException, InterruptedException { - Set colls = getExternColls(); - colls.addAll(clusterState.getCollectionStates().keySet()); - allCollections = Collections.unmodifiableSet(colls); - } - - private Set getExternColls() throws KeeperException, InterruptedException { - List children = null; - try { - children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true); - } catch (KeeperException.NoNodeException e) { - log.warn("Error fetching collection names"); - - return new HashSet<>(); - } - if (children == null || children.isEmpty()) return new HashSet<>(); - HashSet result = new HashSet<>(children.size()); - - for (String c : children) { - try { - if (zkClient.exists(getCollectionPath(c), true)) result.add(c); - } catch (Exception e) { - log.warn("Error reading collections nodes", e); - } - } - return result; } @@ -482,7 +440,7 @@ public class ZkStateReader { liveNodesSet.addAll(liveNodes); if (!onlyLiveNodes) { - log.debug("Updating cloud state from ZooKeeper... "); + log.info("Updating cloud state from ZooKeeper... "); clusterState = ClusterState.load(zkClient, liveNodesSet,this); } else { @@ -491,7 +449,6 @@ public class ZkStateReader { clusterState.setLiveNodes(liveNodesSet); } this.clusterState = clusterState; - updateCollectionNames(); } } else { @@ -550,13 +507,9 @@ public class ZkStateReader { } }, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS); } - synchronized (this) { - for (String watchedCollection : watchedCollections) { - watchedCollectionStates.put(watchedCollection, getCollectionLive(this, watchedCollection)); - } - } - } + } + /** * @return information about the cluster from ZooKeeper */ @@ -679,9 +632,6 @@ public class ZkStateReader { public SolrZkClient getZkClient() { return zkClient; } - public Set getAllCollections(){ - return allCollections; - } public void updateAliases() throws KeeperException, InterruptedException { byte[] data = zkClient.getData(ALIASES, null, null, true); @@ -728,167 +678,4 @@ public class ZkStateReader { } } - public void updateWatchedCollection(DocCollection c) { - if(watchedCollections.contains(c.getName())){ - watchedCollectionStates.put(c.getName(), c); - log.info("Updated DocCollection "+c.getName()+" to: "); - } - } - - /** - * Advance usage - * This method can be used to fetch a collection object and control whether it hits - * the cache only or if information can be looked up from ZooKeeper. - * - * @param coll the collection name - * @param cachedCopyOnly whether to fetch data from cache only or if hitting Zookeeper is acceptable - * @return the {@link org.apache.solr.common.cloud.DocCollection} - */ - public DocCollection getCollection(String coll, boolean cachedCopyOnly) { - if(clusterState.getCollectionStates().get(coll) != null) { - //this collection resides in clusterstate.json. So it's always up-to-date - return clusterState.getCollectionStates().get(coll); - } - if (watchedCollections.contains(coll) || cachedCopyOnly) { - DocCollection c = watchedCollectionStates.get(coll); - if (c != null || cachedCopyOnly) return c; - } - return getCollectionLive(this, coll); - } - - private Map ephemeralCollectionData; - - /** - * this is only set by Overseer not to be set by others and only set inside the Overseer node. If Overseer has - unfinished external collections which are yet to be persisted to ZK - this map is populated and this class can use that information - @param map The map reference - */ - public void setEphemeralCollectionData(Map map){ - ephemeralCollectionData = map; - } - - public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) { - String collectionPath = getCollectionPath(coll); - if(zkStateReader.ephemeralCollectionData !=null ){ - ClusterState cs = (ClusterState) zkStateReader.ephemeralCollectionData.get(collectionPath); - if(cs !=null) { - return cs.getCollectionStates().get(coll); - } - } - try { - if (!zkStateReader.getZkClient().exists(collectionPath, true)) return null; - Stat stat = new Stat(); - byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true); - ClusterState state = ClusterState.load(stat.getVersion(), data, Collections.emptySet(), zkStateReader, collectionPath); - return state.getCollectionStates().get(coll); - } catch (KeeperException.NoNodeException e) { - log.warn("No node available : " + collectionPath, e); - return null; - } catch (KeeperException e) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e); - } - } - - public DocCollection getCollection(String coll) { - return getCollection(coll, false); - } - - public static String getCollectionPath(String coll) { - return COLLECTIONS_ZKNODE+"/"+coll + "/state.json"; - } - - public void addCollectionWatch(String coll) throws KeeperException, InterruptedException { - synchronized (this){ - if(watchedCollections.contains(coll)) return; - else { - watchedCollections.add(coll); - } - addZkWatch(coll); - } - - } - - private void addZkWatch(final String coll) throws KeeperException, InterruptedException { - log.info("addZkWatch {}", coll); - final String fullpath = getCollectionPath(coll); - synchronized (getUpdateLock()){ - - cmdExecutor.ensureExists(fullpath, zkClient); - log.info("Updating collection state at {} from ZooKeeper... ",fullpath); - - Watcher watcher = new Watcher() { - - @Override - public void process(WatchedEvent event) { - // session events are not change events, - // and do not remove the watcher - if (EventType.None.equals(event.getType())) { - return; - } - log.info("A cluster state change: {}, has occurred - updating... ", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size()); - try { - - // delayed approach - // ZkStateReader.this.updateClusterState(false, false); - synchronized (ZkStateReader.this.getUpdateLock()) { - if(!watchedCollections.contains(coll)) { - log.info("Unwatched collection {}",coll); - return; - } - // remake watch - final Watcher thisWatch = this; - Stat stat = new Stat(); - byte[] data = zkClient.getData(fullpath, thisWatch, stat, true); - - if(data == null || data.length ==0){ - log.warn("No value set for collection state : {}", coll); - return; - - } - ClusterState clusterState = ClusterState.load(stat.getVersion(), data, Collections.emptySet(),ZkStateReader.this,fullpath); - // update volatile - - DocCollection newState = clusterState.getCollectionStates().get(coll); - watchedCollectionStates.put(coll, newState); - log.info("Updating data for {} to ver {} ", coll , newState.getZNodeVersion()); - - } - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED - || e.code() == KeeperException.Code.CONNECTIONLOSS) { - log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); - return; - } - log.error("Unwatched collection :"+coll , e); - throw new ZooKeeperException(ErrorCode.SERVER_ERROR, - "", e); - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("Unwatched collection :"+coll , e); - return; - } - } - - }; - zkClient.exists(fullpath, watcher, true); - } - - watchedCollectionStates.put(coll, getCollectionLive(this, coll)); - } - - /**This is not a public API. Only used by ZkController */ - public void removeZKWatch(final String coll){ - synchronized (this){ - watchedCollections.remove(coll); - } - } - - - - } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java index a4b6776d133..82ce79df9a6 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java @@ -20,6 +20,7 @@ package org.apache.solr.client.solrj.impl; import java.io.File; import java.io.IOException; import java.net.MalformedURLException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; @@ -41,6 +42,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.AbstractFullDistribZkTestBase; import org.apache.solr.cloud.AbstractZkTestCase; +import org.apache.solr.cloud.Overseer; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; @@ -49,6 +51,7 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; @@ -59,6 +62,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,7 +125,6 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase { @Override public void doTest() throws Exception { allTests(); - stateVersionParamTest(); } private void allTests() throws Exception { @@ -343,77 +346,7 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase { SolrInputDocument doc = getDoc(fields); indexDoc(doc); } - - private void stateVersionParamTest() throws Exception { - CloudSolrServer client = createCloudClient(null); - try { - String collectionName = "checkStateVerCol"; - createCollection(collectionName, client, 2, 2); - waitForRecoveriesToFinish(collectionName, false); - DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName); - Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next(); - - HttpSolrServer httpSolrServer = new HttpSolrServer(r.getStr(ZkStateReader.BASE_URL_PROP) + "/"+collectionName); - - - SolrQuery q = new SolrQuery().setQuery("*:*"); - - log.info("should work query, result {}", httpSolrServer.query(q)); - //no problem - q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion()); - log.info("2nd query , result {}", httpSolrServer.query(q)); - //no error yet good - - q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+ (coll.getZNodeVersion() -1)); //an older version expect error - - HttpSolrServer.RemoteSolrException sse = null; - try { - httpSolrServer.query(q); - log.info("expected query error"); - } catch (HttpSolrServer.RemoteSolrException e) { - sse = e; - } - httpSolrServer.shutdown(); - assertNotNull(sse); - assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code); - - //now send the request to another node that does n ot serve the collection - - Set allNodesOfColl = new HashSet<>(); - for (Slice slice : coll.getSlices()) { - for (Replica replica : slice.getReplicas()) { - allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP)); - } - } - String theNode = null; - for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) { - String n = client.getZkStateReader().getBaseUrlForNodeName(s); - if(!allNodesOfColl.contains(s)){ - theNode = n; - break; - } - } - log.info("thenode which does not serve this collection{} ",theNode); - assertNotNull(theNode); - httpSolrServer = new HttpSolrServer(theNode + "/"+collectionName); - - q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion()); - - try { - httpSolrServer.query(q); - log.info("error was expected"); - } catch (HttpSolrServer.RemoteSolrException e) { - sse = e; - } - httpSolrServer.shutdown(); - assertNotNull(sse); - assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code); - } finally { - client.shutdown(); - } - - } - + public void testShutdown() throws MalformedURLException { CloudSolrServer server = new CloudSolrServer("[ff01::114]:33332"); try { diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java index 5b2e63acdb3..24f1ee29b86 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java @@ -339,19 +339,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes return createJettys(numJettys, false); } - protected int defaultStateFormat = 1 + random().nextInt(2); - - protected int getStateFormat() { - String stateFormat = System.getProperty("tests.solr.stateFormat", null); - if (stateFormat != null) { - if ("2".equals(stateFormat)) { - return defaultStateFormat = 2; - } else if ("1".equals(stateFormat)) { - return defaultStateFormat = 1; - } - } - return defaultStateFormat; // random - } /** * @param checkCreatedVsState @@ -364,17 +351,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes List clients = new ArrayList<>(); StringBuilder sb = new StringBuilder(); - if(getStateFormat() == 2) { - log.info("Creating collection1 with stateFormat=2"); - SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT); - Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(ZkNodeProps.makeMap( - Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.CREATECOLLECTION, - "name", DEFAULT_COLLECTION, - "numShards", String.valueOf(sliceCount), - DocCollection.STATE_FORMAT, getStateFormat()))); - zkClient.close(); - } - for (int i = 1; i <= numJettys; i++) { if (sb.length() > 0) sb.append(','); int cnt = this.jettyIntCntr.incrementAndGet(); @@ -1514,10 +1490,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes collectionInfos.put(collectionName, list); } params.set("name", collectionName); - if (getStateFormat() == 2) { - log.info("Creating collection with stateFormat=2: " + collectionName); - params.set(DocCollection.STATE_FORMAT, "2"); - } SolrRequest request = new QueryRequest(params); request.setPath("/admin/collections");