reverting SOLR-5473 , SOLR-5474

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1607587 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2014-07-03 11:13:35 +00:00
parent 3b6e78cf3b
commit 2f28cc16e0
20 changed files with 183 additions and 1033 deletions

View File

@ -70,10 +70,6 @@ New Features
* SOLR-6103: Added DateRangeField for indexing date ranges, especially
multi-valued ones. Based on LUCENE-5648. (David Smiley)
* SOLR-5473: Make one state.json per collection (noble, shalin, Timothy Potter ,Jessica Cheng, Anshum Gupta, Mark Miller)
* SOLR-5474: Add stateFormat=2 support to CloudSolrServer (Timothy Potter , noble , Jessica Cheng)
Other Changes
----------------------

View File

@ -263,7 +263,7 @@ sb.append("(group_name=").append(tg.getName()).append(")");
private Map<String,Object> getReplicaProps(ZkController zkController, SolrCore core) {
final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
Replica replica = zkController.getClusterState().getReplica(collection, core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(),true);
Replica replica = zkController.getClusterState().getReplica(collection, core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
if(replica!=null) {
return replica.getProperties();
}

View File

@ -33,7 +33,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -105,8 +104,6 @@ public class Overseer {
private Map clusterProps;
private boolean isClosed = false;
private final Map<String, Object> updateNodes = new ConcurrentHashMap<>();
private boolean isClusterStateModified = false;
public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
@ -120,7 +117,6 @@ public class Overseer {
this.myId = myId;
this.reader = reader;
clusterProps = reader.getClusterProps();
reader.setEphemeralCollectionData(Collections.unmodifiableMap(updateNodes));
}
public Stats getStateUpdateQueueStats() {
@ -261,7 +257,6 @@ public class Overseer {
stateUpdateQueue.poll();
if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break;
if(!updateNodes.isEmpty()) break;
// if an event comes in the next 100ms batch it together
head = stateUpdateQueue.peek(100);
}
@ -301,28 +296,8 @@ public class Overseer {
TimerContext timerContext = stats.time("update_state");
boolean success = false;
try {
if(!updateNodes.isEmpty()) {
for (Entry<String, Object> e : updateNodes.entrySet()) {
if (e.getValue() == null) {
if (zkClient.exists(e.getKey(), true)) zkClient.delete(e.getKey(), 0, true);
} else {
if (zkClient.exists(e.getKey(), true)) {
log.info("going to update_collection {}", e.getKey());
zkClient.setData(e.getKey(), ZkStateReader.toJSON(e.getValue()), true);
} else {
log.info("going to create_collection {}", e.getValue());
zkClient.create(e.getKey(), ZkStateReader.toJSON(e.getValue()), CreateMode.PERSISTENT, true);
}
}
}
updateNodes.clear();
}
if(isClusterStateModified) {
lastUpdatedTime = System.nanoTime();
zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true);
isClusterStateModified = false;
}
lastUpdatedTime = System.nanoTime();
success = true;
} finally {
timerContext.stop();
@ -891,15 +866,7 @@ public class Overseer {
collectionProps.put(DocCollection.DOC_ROUTER, routerSpec);
if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true");
String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null : ZkStateReader.getCollectionPath(collectionName);
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router , -1,znode);
isClusterStateModified = true;
log.info("state version {} {}", collectionName, newCollection.getStateFormat());
if (newCollection.getStateFormat() > 1) {
updateNodes.put(ZkStateReader.getCollectionPath(collectionName),
new ClusterState(-1, Collections.<String>emptySet(), singletonMap(newCollection.getName(), newCollection), state.getStateReader()));
return state;
}
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
return newState(state, singletonMap(newCollection.getName(), newCollection));
}
@ -942,23 +909,26 @@ public class Overseer {
private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
// System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
// System.out.println("Updating slice:" + slice);
DocCollection newCollection = null;
DocCollection coll = state.getCollectionOrNull(collectionName) ;
Map<String,Slice> slices;
Map<String,Object> props;
DocRouter router;
if (coll == null) {
// when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
// without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
slices = new LinkedHashMap<>(1);
slices.put(slice.getName(), slice);
Map<String,Object> props = new HashMap<>(1);
slices = new HashMap<>(1);
props = new HashMap<>(1);
props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME));
newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
router = new ImplicitDocRouter();
} else {
props = coll.getProperties();
router = coll.getRouter();
slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy
slices.put(slice.getName(), slice);
newCollection = coll.copyWith(slices);
}
slices.put(slice.getName(), slice);
DocCollection newCollection = new DocCollection(collectionName, slices, props, router);
// System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
@ -1017,28 +987,12 @@ public class Overseer {
}
DocCollection newCollection = coll.copyWith(slices);
DocCollection newCollection = new DocCollection(coll.getName(), slices, coll.getProperties(), coll.getRouter());
return newState(state, singletonMap(collectionName, newCollection));
}
private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
for (Entry<String, DocCollection> e : colls.entrySet()) {
DocCollection c = e.getValue();
if (c == null) {
isClusterStateModified = true;
state = state.copyWith(singletonMap(e.getKey(), (DocCollection) null));
continue;
}
if (c.getStateFormat() >1) {
state.getStateReader().updateWatchedCollection(c);
updateNodes.put(ZkStateReader.getCollectionPath(c.getName()), new ClusterState(-1, Collections.<String>emptySet(), singletonMap(c.getName(), c), state.getStateReader()));
} else {
isClusterStateModified = true;
state = state.copyWith(singletonMap(e.getKey(), c));
}
}
return state;
return state.copyWith(colls);
}
/*
@ -1049,22 +1003,11 @@ public class Overseer {
if (!checkKeyExistence(message, "name")) return clusterState;
DocCollection coll = clusterState.getCollectionOrNull(collection);
if(coll !=null) {
isClusterStateModified = true;
if(coll.getStateFormat()>1){
try {
log.info("Deleting state for collection : {}", collection);
zkClient.delete(ZkStateReader.getCollectionPath(collection),-1,true);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"Unable to remove collection state :"+collection);
}
return clusterState;
} else{
return clusterState.copyWith(singletonMap(collection,(DocCollection)null));
}
}
return clusterState;
}
/*
* Remove collection slice from cloudstate
*/
@ -1080,7 +1023,7 @@ public class Overseer {
Map<String, Slice> newSlices = new LinkedHashMap<>(coll.getSlicesMap());
newSlices.remove(sliceId);
DocCollection newCollection = coll.copyWith(newSlices);
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
return newState(clusterState, singletonMap(collection,newCollection));
}
@ -1092,6 +1035,8 @@ public class Overseer {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return clusterState;
// final Map<String, DocCollection> newCollections = new LinkedHashMap<>(clusterState.getCollectionStates()); // shallow copy
// DocCollection coll = newCollections.get(collection);
DocCollection coll = clusterState.getCollectionOrNull(collection) ;
if (coll == null) {
// TODO: log/error that we didn't find it?
@ -1146,7 +1091,7 @@ public class Overseer {
// but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
// ZkController out of the Overseer.
try {
zkClient.delete("/collections/" + collection,-1,true);
zkClient.clean("/collections/" + collection);
} catch (InterruptedException e) {
SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
Thread.currentThread().interrupt();
@ -1156,7 +1101,7 @@ public class Overseer {
return newState(clusterState,singletonMap(collection, (DocCollection) null));
} else {
DocCollection newCollection = coll.copyWith(newSlices);
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
return newState(clusterState,singletonMap(collection,newCollection));
}

View File

@ -707,13 +707,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
Set<String> collections = clusterState.getCollections();
for (String name : collections) {
Map<String, Object> collectionStatus = null;
if (clusterState.getCollection(name).getStateFormat()>1) {
bytes = ZkStateReader.toJSON(clusterState.getCollection(name));
Map<String, Object> docCollection = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
collectionStatus = getCollectionStatus(docCollection, name, shard);
} else {
collectionStatus = getCollectionStatus((Map<String, Object>) stateMap.get(name), name, shard);
}
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
collectionStatus.put("aliases", collectionVsAliases.get(name));
}
@ -722,12 +716,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
} else {
String routeKey = message.getStr(ShardParams._ROUTE_);
Map<String, Object> docCollection = null;
if (clusterState.getCollection(collection).getStateFormat()>1 ) {
bytes = ZkStateReader.toJSON(clusterState.getCollection(collection));
docCollection = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
} else {
docCollection = (Map<String, Object>) stateMap.get(collection);
}
if (routeKey == null) {
Map<String, Object> collectionStatus = getCollectionStatus(docCollection, collection, shard);
if (collectionVsAliases.containsKey(collection) && !collectionVsAliases.get(collection).isEmpty()) {

View File

@ -1109,16 +1109,6 @@ public final class ZkController {
}
CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
boolean removeWatch = true;
for (SolrCore solrCore : cc.getCores()) {//if there is no SolrCoe which is a member of this collection, remove the watch
CloudDescriptor cloudDesc = solrCore.getCoreDescriptor().getCloudDescriptor();
if (cloudDesc != null && cloudDescriptor.getCollectionName().equals(cloudDesc.getCollectionName())) {
//means
removeWatch = false;
break;
}
}
if(removeWatch) zkStateReader.removeZKWatch(collection);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.DELETECORE, ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, getNodeName(),
@ -1421,10 +1411,6 @@ public final class ZkController {
publish(cd, ZkStateReader.DOWN, false, true);
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
if(collection !=null && collection.getStateFormat() >1 ){
log.info("Registering watch for collection {}",cd.getCloudDescriptor().getCollectionName());
zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName());
}
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);

View File

@ -477,7 +477,6 @@ public class CollectionsHandler extends RequestHandlerBase {
MAX_SHARDS_PER_NODE,
CREATE_NODE_SET ,
SHARDS_PROP,
DocCollection.STATE_FORMAT,
ASYNC,
"router.");

View File

@ -324,7 +324,6 @@ public class SolrDispatchFilter extends BaseSolrFilter {
String coreUrl = getRemotCoreUrl(cores, corename, origCorename);
// don't proxy for internal update requests
SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());
checkStateIsValid(cores, queryParams.get(CloudSolrServer.STATE_VERSION));
if (coreUrl != null
&& queryParams
.get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) {
@ -380,7 +379,6 @@ public class SolrDispatchFilter extends BaseSolrFilter {
if( "/select".equals( path ) || "/select/".equals( path ) ) {
solrReq = parser.parse( core, path, req );
checkStateIsValid(cores,solrReq.getParams().get(CloudSolrServer.STATE_VERSION));
String qt = solrReq.getParams().get( CommonParams.QT );
handler = core.getRequestHandler( qt );
if( handler == null ) {
@ -470,22 +468,6 @@ public class SolrDispatchFilter extends BaseSolrFilter {
chain.doFilter(request, response);
}
private void checkStateIsValid(CoreContainer cores, String stateVer) {
if(stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware() ){
// many have multiple collections separated by |
String[] pairs = StringUtils.split(stateVer, '|');
for (String pair : pairs) {
String[] pcs = StringUtils.split(pair, ':');
if(pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()){
Boolean status = cores.getZkController().getZkStateReader().checkValid(pcs[0],Integer.parseInt(pcs[1]));
if(Boolean.TRUE != status){
throw new SolrException(ErrorCode.INVALID_STATE, "STATE STALE: " + pair+ "valid : "+status);
}
}
}
}
}
private void processAliases(SolrQueryRequest solrReq, Aliases aliases,
List<String> collectionsList) {

View File

@ -17,26 +17,6 @@
package org.apache.solr.servlet;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.FastWriter;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import org.noggit.ObjectBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
@ -49,6 +29,30 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.lucene.util.BytesRef;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import org.noggit.ObjectBuilder;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.FastWriter;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Zookeeper Info
@ -86,6 +90,7 @@ public final class ZookeeperInfoServlet extends HttpServlet {
String path = params.get("path");
String addr = params.get("addr");
boolean all = "true".equals(params.get("all"));
if (addr != null && addr.length() == 0) {
addr = null;
@ -105,7 +110,6 @@ public final class ZookeeperInfoServlet extends HttpServlet {
ZKPrinter printer = new ZKPrinter(response, out, cores.getZkController(), addr);
printer.detail = detail;
printer.dump = dump;
printer.isTreeView = (params.get("wt") == null); // this is hacky but tree view requests don't come in with the wt set
try {
printer.print(path);
@ -136,8 +140,6 @@ public final class ZookeeperInfoServlet extends HttpServlet {
boolean detail = false;
boolean dump = false;
boolean isTreeView = false;
String addr; // the address passed to us
String keeperAddr; // the address we're connected to
@ -383,47 +385,6 @@ public final class ZookeeperInfoServlet extends HttpServlet {
dataStrErr = "data is not parsable as a utf8 String: " + e.toString();
}
}
// pull in external collections too
if (ZkStateReader.CLUSTER_STATE.equals(path) && !isTreeView) {
SortedMap<String,Object> collectionStates = null;
List<String> children = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, null, true);
java.util.Collections.sort(children);
for (String collection : children) {
String collStatePath = ZkStateReader.getCollectionPath(collection);
String childDataStr = null;
try {
byte[] childData = zkClient.getData(collStatePath, null, null, true);
if (childData != null) {
childDataStr = (new BytesRef(childData)).utf8ToString();
}
} catch (KeeperException.NoNodeException nne) {
// safe to ignore
} catch (Exception childErr) {
log.error("Failed to get "+collStatePath+" due to: "+childErr);
}
if (childDataStr != null) {
if (collectionStates == null) {
// initialize lazily as there may not be any external collections
collectionStates = new TreeMap<>();
// add the internal collections
if (dataStr != null)
collectionStates.putAll((Map<String,Object>)ObjectBuilder.fromJSON(dataStr));
}
// now add in the external collections
Map<String,Object> extColl = (Map<String,Object>)ObjectBuilder.fromJSON(childDataStr);
collectionStates.put(collection, extColl.get(collection));
}
}
if (collectionStates != null) {
CharArr out = new CharArr();
new JSONWriter(out, 2).write(collectionStates);
dataStr = out.toString();
}
}
json.writeString("znode");
json.writeNameSeparator();

View File

@ -234,7 +234,7 @@ public class SolrLogLayout extends Layout {
private Map<String,Object> getReplicaProps(ZkController zkController, SolrCore core) {
final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
Replica replica = zkController.getZkStateReader().getClusterState(). getReplica(collection, zkController.getCoreNodeName(core.getCoreDescriptor()), true);
Replica replica = zkController.getClusterState().getReplica(collection, zkController.getCoreNodeName(core.getCoreDescriptor()));
if(replica!=null) {
return replica.getProperties();
}

View File

@ -86,7 +86,7 @@ public class AssignTest extends SolrTestCaseJ4 {
collectionStates.put(cname, docCollection);
Set<String> liveNodes = new HashSet<>();
ClusterState state = new ClusterState(-1,liveNodes, collectionStates, ClusterStateTest.getMockZkStateReader(collectionStates.keySet()));
ClusterState state = new ClusterState(-1,liveNodes, collectionStates);
String nodeName = Assign.assignNode("collection1", state);
assertEquals("core_node2", nodeName);

View File

@ -62,10 +62,10 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
collectionStates.put("collection2", new DocCollection("collection2", slices, null, DocRouter.DEFAULT));
ZkStateReader zkStateReaderMock = getMockZkStateReader(collectionStates.keySet());
ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates,zkStateReaderMock);
ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
byte[] bytes = ZkStateReader.toJSON(clusterState);
// System.out.println("#################### " + new String(bytes));
ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes,zkStateReaderMock,null);
ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());
@ -73,13 +73,13 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
assertEquals("Poperties not copied properly", replica.getStr("prop1"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop1"));
assertEquals("Poperties not copied properly", replica.getStr("prop2"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop2"));
loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes, getMockZkStateReader(Collections.<String>emptySet()),null );
loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());
assertEquals("Should not have collections", 0, loadedClusterState.getCollections().size());
loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes,getMockZkStateReader(Collections.<String>emptySet()),null);
loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());
@ -89,13 +89,6 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
public static ZkStateReader getMockZkStateReader(final Set<String> collections) {
ZkStateReader mock = createMock(ZkStateReader.class);
EasyMock.reset(mock);
mock.getAllCollections();
EasyMock.expectLastCall().andAnswer(new IAnswer<Set<String>>() {
@Override
public Set<String> answer() throws Throwable {
return collections;
}
}).anyTimes();
EasyMock.replay(mock);
return mock;

View File

@ -1,119 +0,0 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
public class ExternalCollectionsTest extends AbstractFullDistribZkTestBase {
private CloudSolrServer client;
@BeforeClass
public static void beforeThisClass2() throws Exception {
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
System.setProperty("numShards", Integer.toString(sliceCount));
System.setProperty("solr.xml.persist", "true");
client = createCloudClient(null);
}
@After
public void tearDown() throws Exception {
super.tearDown();
client.shutdown();
}
protected String getSolrXml() {
return "solr-no-core.xml";
}
public ExternalCollectionsTest() {
fixShardCount = true;
sliceCount = 2;
shardCount = 4;
checkCreatedVsState = false;
}
@Override
public void doTest() throws Exception {
testZkNodeLocation();
}
boolean externalColl = false;
@Override
protected int getStateFormat() {
return externalColl ? 2:1;
}
private void testZkNodeLocation() throws Exception{
externalColl=true;
String collectionName = "myExternColl";
createCollection(collectionName, client, 2, 2);
waitForRecoveriesToFinish(collectionName, false);
assertTrue("does not exist collection state externally",
cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
Stat stat = new Stat();
byte[] data = cloudClient.getZkStateReader().getZkClient().getData(ZkStateReader.getCollectionPath(collectionName), null, stat, true);
DocCollection c = ZkStateReader.getCollectionLive(cloudClient.getZkStateReader(), collectionName);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
assertEquals("The zkversion of the nodes must be same zkver:" + stat.getVersion() , stat.getVersion(),clusterState.getCollection(collectionName).getZNodeVersion() );
assertTrue("DocCllection#getStateFormat() must be > 1", cloudClient.getZkStateReader().getClusterState().getCollection(collectionName).getStateFormat() > 1);
// remove collection
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.DELETE.toString());
params.set("name", collectionName);
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
if (client == null) {
client = createCloudClient(null);
}
client.request(request);
checkForMissingCollection(collectionName);
assertFalse("collection state should not exist externally", cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
}
}

View File

@ -50,9 +50,9 @@ public class SliceStateTest extends SolrTestCaseJ4 {
collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT));
ZkStateReader mockZkStateReader = ClusterStateTest.getMockZkStateReader(collectionStates.keySet());
ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates, mockZkStateReader);
ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
byte[] bytes = ZkStateReader.toJSON(clusterState);
ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes, mockZkStateReader,null);
ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
assertEquals("Default state not set to active", "active", loadedClusterState.getSlice("collection1", "shard1").getState());
}

View File

@ -18,9 +18,7 @@ package org.apache.solr.client.solrj.impl;
*/
import java.io.IOException;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -32,16 +30,13 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
@ -72,8 +67,6 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SolrJ client class to communicate with SolrCloud.
@ -86,8 +79,6 @@ import org.slf4j.LoggerFactory;
* with {@link #setIdField(String)}.
*/
public class CloudSolrServer extends SolrServer {
private static final Logger log = LoggerFactory.getLogger(CloudSolrServer.class);
private volatile ZkStateReader zkStateReader;
private String zkHost; // the zk server address
private int zkConnectTimeout = 10000;
@ -96,8 +87,6 @@ public class CloudSolrServer extends SolrServer {
private final LBHttpSolrServer lbServer;
private final boolean shutdownLBHttpSolrServer;
private HttpClient myClient;
//no of times collection state to be reloaded if stale state error is received
private static final int MAX_STALE_RETRIES = 5;
Random rand = new Random();
private final boolean updatesToLeaders;
@ -106,7 +95,6 @@ public class CloudSolrServer extends SolrServer {
.newCachedThreadPool(new SolrjNamedThreadFactory(
"CloudSolrServer ThreadPool"));
private String idField = "id";
public static final String STATE_VERSION = "_stateVer_";
private final Set<String> NON_ROUTABLE_PARAMS;
{
NON_ROUTABLE_PARAMS = new HashSet<>();
@ -124,36 +112,8 @@ public class CloudSolrServer extends SolrServer {
// NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
}
private volatile long timeToLive = 60* 1000L;
protected Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){
@Override
public ExpiringCachedDocCollection get(Object key) {
ExpiringCachedDocCollection val = super.get(key);
if(val == null) return null;
if(val.isExpired(timeToLive)) {
super.remove(key);
return null;
}
return val;
}
};
class ExpiringCachedDocCollection {
DocCollection cached;
long cachedAt;
ExpiringCachedDocCollection(DocCollection cached) {
this.cached = cached;
this.cachedAt = System.currentTimeMillis();
}
boolean isExpired(long timeToLive) {
return (System.currentTimeMillis() - cachedAt) > timeToLive;
}
}
/**
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
@ -167,8 +127,6 @@ public class CloudSolrServer extends SolrServer {
this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = true;
shutdownLBHttpSolrServer = true;
setupStateVerParamOnQueryString(lbServer);
}
public CloudSolrServer(String zkHost, boolean updatesToLeaders)
@ -180,15 +138,6 @@ public class CloudSolrServer extends SolrServer {
this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = updatesToLeaders;
shutdownLBHttpSolrServer = true;
setupStateVerParamOnQueryString(lbServer);
}
/**Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
* @param seconds ttl value in seconds
*/
public void setCollectionCacheTTl(int seconds){
assert seconds > 0;
timeToLive = seconds*1000L;
}
/**
@ -201,7 +150,6 @@ public class CloudSolrServer extends SolrServer {
this.lbServer = lbServer;
this.updatesToLeaders = true;
shutdownLBHttpSolrServer = false;
setupStateVerParamOnQueryString(lbServer);
}
/**
@ -215,22 +163,6 @@ public class CloudSolrServer extends SolrServer {
this.lbServer = lbServer;
this.updatesToLeaders = updatesToLeaders;
shutdownLBHttpSolrServer = false;
setupStateVerParamOnQueryString(lbServer);
}
/**
* Used internally to setup the _stateVer_ param to be sent in the query string of requests
* coming from this instance.
*/
protected void setupStateVerParamOnQueryString(LBHttpSolrServer lbServer) {
// setup the stateVer param to be passed in the query string of every request
Set<String> queryStringParams = lbServer.getQueryParams();
if (queryStringParams == null) {
queryStringParams = new HashSet<String>(2);
lbServer.setQueryParams(queryStringParams);
}
queryStringParams.add(STATE_VERSION);
}
public ResponseParser getParser() {
@ -306,7 +238,8 @@ public class CloudSolrServer extends SolrServer {
if (zkStateReader == null) {
ZkStateReader zk = null;
try {
zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
zk = new ZkStateReader(zkHost, zkClientTimeout,
zkConnectTimeout);
zk.createClusterStateWatchersAndUpdate();
zkStateReader = zk;
} catch (InterruptedException e) {
@ -369,7 +302,7 @@ public class CloudSolrServer extends SolrServer {
}
}
DocCollection col = getDocCollection(clusterState, collection);
DocCollection col = clusterState.getCollection(collection);
DocRouter router = col.getRouter();
@ -586,146 +519,7 @@ public class CloudSolrServer extends SolrServer {
}
@Override
public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
SolrParams reqParams = request.getParams();
String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection();
return requestWithRetryOnStaleState(request, 0, collection);
}
/**
* As this class doesn't watch external collections on the client side,
* there's a chance that the request will fail due to cached stale state,
* which means the state must be refreshed from ZK and retried.
*/
protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection)
throws SolrServerException, IOException {
connect(); // important to call this before you start working with the ZkStateReader
// build up a _stateVer_ param to pass to the server containing all of the
// external collection state versions involved in this request, which allows
// the server to notify us that our cached state for one or more of the external
// collections is stale and needs to be refreshed ... this code has no impact on internal collections
String stateVerParam = null;
List<DocCollection> requestedCollections = null;
if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests
Set<String> requestedCollectionNames = getCollectionList(getZkStateReader().getClusterState(), collection);
StringBuilder stateVerParamBuilder = null;
for (String requestedCollection : requestedCollectionNames) {
// track the version of state we're using on the client side using the _stateVer_ param
DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection);
int collVer = coll.getZNodeVersion();
if (coll.getStateFormat()>1) {
if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
requestedCollections.add(coll);
if (stateVerParamBuilder == null) {
stateVerParamBuilder = new StringBuilder();
} else {
stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
}
stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
}
}
if (stateVerParamBuilder != null) {
stateVerParam = stateVerParamBuilder.toString();
}
}
if (request.getParams() instanceof ModifiableSolrParams) {
ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
if (stateVerParam != null) {
params.set(STATE_VERSION, stateVerParam);
} else {
params.remove(STATE_VERSION);
}
} // else: ??? how to set this ???
NamedList<Object> resp = null;
try {
resp = sendRequest(request);
} catch (Exception exc) {
Throwable rootCause = SolrException.getRootCause(exc);
// don't do retry support for admin requests or if the request doesn't have a collection specified
if (collection == null || request.getPath().startsWith("/admin")) {
if (exc instanceof SolrServerException) {
throw (SolrServerException)exc;
} else if (exc instanceof IOException) {
throw (IOException)exc;
}else if (exc instanceof RuntimeException) {
throw (RuntimeException) exc;
}
else {
throw new SolrServerException(rootCause);
}
}
int errorCode = (rootCause instanceof SolrException) ?
((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
log.error("Request to collection {} failed due to ("+errorCode+
") {}, retry? "+retryCount, collection, rootCause.toString());
boolean wasCommError =
(rootCause instanceof ConnectException ||
rootCause instanceof ConnectTimeoutException ||
rootCause instanceof NoHttpResponseException ||
rootCause instanceof SocketException);
boolean stateWasStale = false;
if (retryCount < MAX_STALE_RETRIES &&
!requestedCollections.isEmpty() &&
SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
{
// cached state for one or more external collections was stale
// re-issue request using updated state
stateWasStale = true;
// just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
for (DocCollection ext : requestedCollections) {
collectionStateCache.remove(ext.getName());
}
}
// if we experienced a communication error, it's worth checking the state
// with ZK just to make sure the node we're trying to hit is still part of the collection
if (retryCount < MAX_STALE_RETRIES && !stateWasStale && !requestedCollections.isEmpty() && wasCommError) {
for (DocCollection ext : requestedCollections) {
DocCollection latestStateFromZk = getZkStateReader().getCollection(ext.getName());
if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
// looks like we couldn't reach the server because the state was stale == retry
stateWasStale = true;
// we just pulled state from ZK, so update the cache so that the retry uses it
collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
}
}
}
requestedCollections.clear(); // done with this
// if the state was stale, then we retry the request once with new state pulled from Zk
if (stateWasStale) {
log.warn("Re-trying request to collection(s) "+collection+" after stale state error from server.");
resp = requestWithRetryOnStaleState(request, retryCount+1, collection);
} else {
if (exc instanceof SolrServerException) {
throw (SolrServerException)exc;
} else if (exc instanceof IOException) {
throw (IOException)exc;
} else {
throw new SolrServerException(rootCause);
}
}
}
return resp;
}
protected NamedList<Object> sendRequest(SolrRequest request)
public NamedList<Object> request(SolrRequest request)
throws SolrServerException, IOException {
connect();
@ -785,7 +579,7 @@ public class CloudSolrServer extends SolrServer {
// add it to the Map of slices.
Map<String,Slice> slices = new HashMap<>();
for (String collectionName : collectionsList) {
DocCollection col = getDocCollection(clusterState, collectionName);
DocCollection col = clusterState.getCollection(collectionName);
Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
ClientUtils.addSlices(slices, collectionName, routeSlices, true);
}
@ -882,11 +676,8 @@ public class CloudSolrServer extends SolrServer {
continue;
}
DocCollection docCollection = getDocCollection(clusterState, collection);
if (docCollection == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
}
}
collectionsList.add(collectionName);
}
@ -924,28 +715,6 @@ public class CloudSolrServer extends SolrServer {
return updatesToLeaders;
}
protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException {
ExpiringCachedDocCollection cachedState = collectionStateCache != null ? collectionStateCache.get(collection) : null;
if (cachedState != null && cachedState.cached != null) {
return cachedState.cached;
}
DocCollection col = clusterState.getCollectionOrNull(collection);
if(col == null ) return null;
collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
return col;
}
/**
* Extension point to allow sub-classes to override the ZkStateReader this class uses internally.
*/
protected ZkStateReader createZkStateReader(String zkHost, int zkClientTimeout, int zkConnectTimeout)
throws InterruptedException, TimeoutException, IOException, KeeperException {
ZkStateReader zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
zk.createClusterStateWatchersAndUpdate();
return zk;
}
/**
* Useful for determining the minimum achieved replication factor across
* all shards involved in processing an update request, typically useful

View File

@ -46,25 +46,36 @@ public class ClusterState implements JSONWriter.Writable {
private final Map<String, DocCollection> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
private Set<String> liveNodes;
private final ZkStateReader stateReader;
/**
* Use this constr when ClusterState is meant for publication.
*
* hashCode and equals will only depend on liveNodes and not clusterStateVersion.
*/
@Deprecated
public ClusterState(Set<String> liveNodes,
Map<String, DocCollection> collectionStates) {
this(null, liveNodes, collectionStates);
}
/**
* Use this constr when ClusterState is meant for consumption.
*/
public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes,
Map<String, DocCollection> collectionStates, ZkStateReader stateReader) {
assert stateReader != null;
Map<String, DocCollection> collectionStates) {
this.zkClusterStateVersion = zkClusterStateVersion;
this.liveNodes = new HashSet<>(liveNodes.size());
this.liveNodes.addAll(liveNodes);
this.collectionStates = new LinkedHashMap<>(collectionStates.size());
this.collectionStates.putAll(collectionStates);
this.stateReader = stateReader;
}
public ClusterState copyWith(Map<String,DocCollection> modified){
ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates,stateReader);
ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates);
for (Entry<String, DocCollection> e : modified.entrySet()) {
DocCollection c = e.getValue();
if(c == null) {
@ -87,10 +98,17 @@ public class ClusterState implements JSONWriter.Writable {
if (slice == null) return null;
return slice.getLeader();
}
private Replica getReplica(DocCollection coll, String replicaName) {
if (coll == null) return null;
for (Slice slice : coll.getSlices()) {
Replica replica = slice.getReplica(replicaName);
if (replica != null) return replica;
}
return null;
}
public boolean hasCollection(String coll) {
if (collectionStates.containsKey(coll)) return true;
return stateReader.getAllCollections().contains(coll);
return collectionStates.containsKey(coll) ;
}
/**
@ -98,9 +116,8 @@ public class ClusterState implements JSONWriter.Writable {
* If the slice is known, do not use this method.
* coreNodeName is the same as replicaName
*/
public Replica getReplica(final String collection, final String coreNodeName, boolean cachedOnly) {
DocCollection coll = stateReader.getCollection(collection,cachedOnly);
return coll == null? null: coll.getReplica(coreNodeName);
public Replica getReplica(final String collection, final String coreNodeName) {
return getReplica(collectionStates.get(collection), coreNodeName);
}
/**
@ -136,35 +153,6 @@ public class ClusterState implements JSONWriter.Writable {
return coll.getActiveSlices();
}
/**
* Get the {@code DocCollection} object if available. This method will
* never hit ZooKeeper and attempt to fetch collection from locally available
* state only.
*
* @param collection the name of the collection
* @return the {@link org.apache.solr.common.cloud.DocCollection} or null if not found
*/
private DocCollection getCachedCollection(String collection) {
DocCollection c = collectionStates.get(collection);
if (c != null) return c;
if (!stateReader.getAllCollections().contains(collection)) return null;
return stateReader.getCollection(collection, true); // return from cache
}
/** expert internal use only
* Gets the replica from caches by the core name (assuming the slice is unknown) or null if replica is not found.
* If the slice is known, do not use this method.
* coreNodeName is the same as replicaName
*/
public Replica getCachedReplica(String collectionName, String coreNodeName) {
DocCollection c = getCachedCollection(collectionName);
if (c == null) return null;
for (Slice slice : c.getSlices()) {
Replica replica = slice.getReplica(coreNodeName);
if (replica != null) return replica;
}
return null;
}
/**
* Get the named DocCollection object, or throw an exception if it doesn't exist.
@ -175,26 +163,23 @@ public class ClusterState implements JSONWriter.Writable {
return coll;
}
public DocCollection getCollectionOrNull(String coll) {
DocCollection c = collectionStates.get(coll);
if (c != null) return c;
if (!stateReader.getAllCollections().contains(coll)) return null;
return stateReader.getCollection(coll);
return collectionStates.get(coll);
}
/**
* Get collection names.
*/
public Set<String> getCollections() {
return stateReader.getAllCollections();
return collectionStates.keySet();
}
/**
* For internal use only
* @return Map&lt;collectionName, Map&lt;sliceName,Slice&gt;&gt;
*/
Map<String, DocCollection> getCollectionStates() {
return collectionStates;
public Map<String, DocCollection> getCollectionStates() {
return Collections.unmodifiableMap(collectionStates);
}
/**
@ -253,7 +238,7 @@ public class ClusterState implements JSONWriter.Writable {
Stat stat = new Stat();
byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE,
null, stat, true);
return load(stat.getVersion(), state, liveNodes, stateReader, ZkStateReader.CLUSTER_STATE);
return load(stat.getVersion(), state, liveNodes);
}
@ -265,34 +250,25 @@ public class ClusterState implements JSONWriter.Writable {
* @param version zk version of the clusterstate.json file (bytes)
* @param bytes clusterstate.json as a byte array
* @param liveNodes list of live nodes
* @param stateReader The ZkStateReader for this clusterstate
* @param znode the znode from which this data is read from
* @return the ClusterState
*/
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes, ZkStateReader stateReader, String znode) {
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
// System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes)));
if (bytes == null || bytes.length == 0) {
return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap(),stateReader);
return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap());
}
Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
Map<String,DocCollection> collections = new LinkedHashMap<>(stateMap.size());
for (Entry<String, Object> entry : stateMap.entrySet()) {
String collectionName = entry.getKey();
DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version, znode);
DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version);
collections.put(collectionName, coll);
}
// System.out.println("######## ClusterState.load result:" + collections);
return new ClusterState( version, liveNodes, collections,stateReader);
return new ClusterState( version, liveNodes, collections);
}
/**
* @deprecated use {@link #load(Integer, byte[], Set, ZkStateReader, String)}
*/
@Deprecated
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes){
return load(version == null ? -1: version, bytes, liveNodes,null,null);
}
public static Aliases load(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
@ -303,7 +279,7 @@ public class ClusterState implements JSONWriter.Writable {
return new Aliases(aliasMap);
}
private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version, String znode) {
private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version) {
Map<String,Object> props;
Map<String,Slice> slices;
@ -330,7 +306,7 @@ public class ClusterState implements JSONWriter.Writable {
router = DocRouter.getDocRouter((String) routerProps.get("name"));
}
return new DocCollection(name, slices, props, router, version,znode);
return new DocCollection(name, slices, props, router, version);
}
private static Map<String,Slice> makeSlices(Map<String,Object> genericSlices) {
@ -358,7 +334,7 @@ public class ClusterState implements JSONWriter.Writable {
*
* @return null if ClusterState was created for publication, not consumption
*/
public Integer getZnodeVersion() {
public Integer getZkClusterStateVersion() {
return zkClusterStateVersion;
}
@ -388,9 +364,6 @@ public class ClusterState implements JSONWriter.Writable {
}
public ZkStateReader getStateReader(){
return stateReader;
}
/**
* Internal API used only by ZkStateReader
@ -399,5 +372,9 @@ public class ClusterState implements JSONWriter.Writable {
this.liveNodes = liveNodes;
}
public DocCollection getCommonCollection(String name){
return collectionStates.get(name);
}
}

View File

@ -21,6 +21,7 @@ import org.noggit.JSONUtil;
import org.noggit.JSONWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@ -32,17 +33,15 @@ import java.util.Map;
public class DocCollection extends ZkNodeProps {
public static final String DOC_ROUTER = "router";
public static final String SHARDS = "shards";
public static final String STATE_FORMAT = "stateFormat";
private int znodeVersion;
private int version;
private final String name;
private final Map<String, Slice> slices;
private final Map<String, Slice> activeSlices;
private final DocRouter router;
private final String znode;
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
this(name, slices, props, router, -1, ZkStateReader.CLUSTER_STATE);
this(name, slices, props, router, -1);
}
/**
@ -50,9 +49,9 @@ public class DocCollection extends ZkNodeProps {
* @param slices The logical shards of the collection. This is used directly and a copy is not made.
* @param props The properties of the slice. This is used directly and a copy is not made.
*/
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, String znode) {
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion) {
super( props==null ? props = new HashMap<String,Object>() : props);
this.znodeVersion = zkVersion;
this.version = zkVersion;
this.name = name;
this.slices = slices;
@ -66,12 +65,8 @@ public class DocCollection extends ZkNodeProps {
this.activeSlices.put(slice.getKey(), slice.getValue());
}
this.router = router;
this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
assert name != null && slices != null;
}
public DocCollection copyWith(Map<String, Slice> slices){
return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode);
assert name != null && slices != null;
}
@ -115,16 +110,9 @@ public class DocCollection extends ZkNodeProps {
return activeSlices;
}
public int getZNodeVersion(){
return znodeVersion;
}
public int getVersion(){
return version;
public int getStateFormat(){
return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1:2;
}
public String getZNode(){
return znode;
}
@ -144,12 +132,4 @@ public class DocCollection extends ZkNodeProps {
all.put(SHARDS, slices);
jsonWriter.write(all);
}
public Replica getReplica(String coreNodeName) {
for (Slice slice : slices.values()) {
Replica replica = slice.getReplica(coreNodeName);
if (replica != null) return replica;
}
return null;
}
}

View File

@ -564,7 +564,6 @@ public class SolrZkClient {
}
public void close() {
// log.warn("closed inst :"+inst, new Exception("leakdebug"));
if (isClosed) return; // it's okay if we over close - same as solrcore
isClosed = true;
try {

View File

@ -98,14 +98,8 @@ public class ZkStateReader {
public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
public static final String SHARD_LEADERS_ZKNODE = "leaders";
private final Set<String> watchedCollections = new HashSet<String>();
/**These are collections which are actively watched by this instance .
*
*/
private Map<String , DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
private Set<String> allCollections = Collections.emptySet();
//
@ -168,8 +162,7 @@ public class ZkStateReader {
log.info("path={} {}={} specified config exists in ZooKeeper",
new Object[] {path, CONFIGNAME_PROP, configName});
}
} else {
throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path);
}
}
catch (KeeperException e) {
@ -258,21 +251,22 @@ public class ZkStateReader {
return aliases;
}
public Boolean checkValid(String coll, int version){
/*public Boolean checkValid(String coll, int version){
DocCollection collection = clusterState.getCollectionOrNull(coll);
if(collection ==null) return null;
if(collection.getZNodeVersion() < version){
log.info("server older than client {}<{}",collection.getZNodeVersion(),version);
DocCollection nu = getCollectionLive(this, coll);
if(nu.getZNodeVersion()> collection.getZNodeVersion()){
updateWatchedCollection(nu);
if(collection.getVersion() < version){
log.info("server older than client {}<{}",collection.getVersion(),version);
DocCollection nu = getExternCollectionFresh(this, coll);
if(nu.getVersion()> collection.getVersion()){
updateExternCollection(nu);
collection = nu;
}
}
if(collection.getZNodeVersion() == version) return Boolean.TRUE;
log.debug("wrong version from client {}!={} ",version, collection.getZNodeVersion());
if(collection.getVersion() == version) return Boolean.TRUE;
log.info("wrong version from client {}!={} ",version, collection.getVersion());
return Boolean.FALSE;
}
}*/
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
InterruptedException {
@ -305,11 +299,10 @@ public class ZkStateReader {
byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat ,
true);
Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes();
ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln,ZkStateReader.this, null);
ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln);
// update volatile
ZkStateReader.this.clusterState = clusterState;
updateCollectionNames();
// HashSet<String> all = new HashSet<>(colls);;
// all.addAll(clusterState.getAllInternalCollections());
// all.remove(null);
@ -384,7 +377,6 @@ public class ZkStateReader {
liveNodeSet.addAll(liveNodes);
ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this);
this.clusterState = clusterState;
updateCollectionNames();
zkClient.exists(ALIASES,
new Watcher() {
@ -430,40 +422,6 @@ public class ZkStateReader {
}, true);
}
updateAliases();
//on reconnect of SolrZkClient re-add watchers for the watched external collections
synchronized (this) {
for (String watchedCollection : watchedCollections) {
addZkWatch(watchedCollection);
}
}
}
public void updateCollectionNames() throws KeeperException, InterruptedException {
Set<String> colls = getExternColls();
colls.addAll(clusterState.getCollectionStates().keySet());
allCollections = Collections.unmodifiableSet(colls);
}
private Set<String> getExternColls() throws KeeperException, InterruptedException {
List<String> children = null;
try {
children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
} catch (KeeperException.NoNodeException e) {
log.warn("Error fetching collection names");
return new HashSet<>();
}
if (children == null || children.isEmpty()) return new HashSet<>();
HashSet<String> result = new HashSet<>(children.size());
for (String c : children) {
try {
if (zkClient.exists(getCollectionPath(c), true)) result.add(c);
} catch (Exception e) {
log.warn("Error reading collections nodes", e);
}
}
return result;
}
@ -482,7 +440,7 @@ public class ZkStateReader {
liveNodesSet.addAll(liveNodes);
if (!onlyLiveNodes) {
log.debug("Updating cloud state from ZooKeeper... ");
log.info("Updating cloud state from ZooKeeper... ");
clusterState = ClusterState.load(zkClient, liveNodesSet,this);
} else {
@ -491,7 +449,6 @@ public class ZkStateReader {
clusterState.setLiveNodes(liveNodesSet);
}
this.clusterState = clusterState;
updateCollectionNames();
}
} else {
@ -550,11 +507,7 @@ public class ZkStateReader {
}
}, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
}
synchronized (this) {
for (String watchedCollection : watchedCollections) {
watchedCollectionStates.put(watchedCollection, getCollectionLive(this, watchedCollection));
}
}
}
/**
@ -679,9 +632,6 @@ public class ZkStateReader {
public SolrZkClient getZkClient() {
return zkClient;
}
public Set<String> getAllCollections(){
return allCollections;
}
public void updateAliases() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData(ALIASES, null, null, true);
@ -728,167 +678,4 @@ public class ZkStateReader {
}
}
public void updateWatchedCollection(DocCollection c) {
if(watchedCollections.contains(c.getName())){
watchedCollectionStates.put(c.getName(), c);
log.info("Updated DocCollection "+c.getName()+" to: ");
}
}
/**
* <b>Advance usage</b>
* This method can be used to fetch a collection object and control whether it hits
* the cache only or if information can be looked up from ZooKeeper.
*
* @param coll the collection name
* @param cachedCopyOnly whether to fetch data from cache only or if hitting Zookeeper is acceptable
* @return the {@link org.apache.solr.common.cloud.DocCollection}
*/
public DocCollection getCollection(String coll, boolean cachedCopyOnly) {
if(clusterState.getCollectionStates().get(coll) != null) {
//this collection resides in clusterstate.json. So it's always up-to-date
return clusterState.getCollectionStates().get(coll);
}
if (watchedCollections.contains(coll) || cachedCopyOnly) {
DocCollection c = watchedCollectionStates.get(coll);
if (c != null || cachedCopyOnly) return c;
}
return getCollectionLive(this, coll);
}
private Map ephemeralCollectionData;
/**
* this is only set by Overseer not to be set by others and only set inside the Overseer node. If Overseer has
unfinished external collections which are yet to be persisted to ZK
this map is populated and this class can use that information
@param map The map reference
*/
public void setEphemeralCollectionData(Map map){
ephemeralCollectionData = map;
}
public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
String collectionPath = getCollectionPath(coll);
if(zkStateReader.ephemeralCollectionData !=null ){
ClusterState cs = (ClusterState) zkStateReader.ephemeralCollectionData.get(collectionPath);
if(cs !=null) {
return cs.getCollectionStates().get(coll);
}
}
try {
if (!zkStateReader.getZkClient().exists(collectionPath, true)) return null;
Stat stat = new Stat();
byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true);
ClusterState state = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(), zkStateReader, collectionPath);
return state.getCollectionStates().get(coll);
} catch (KeeperException.NoNodeException e) {
log.warn("No node available : " + collectionPath, e);
return null;
} catch (KeeperException e) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e);
}
}
public DocCollection getCollection(String coll) {
return getCollection(coll, false);
}
public static String getCollectionPath(String coll) {
return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
}
public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
synchronized (this){
if(watchedCollections.contains(coll)) return;
else {
watchedCollections.add(coll);
}
addZkWatch(coll);
}
}
private void addZkWatch(final String coll) throws KeeperException, InterruptedException {
log.info("addZkWatch {}", coll);
final String fullpath = getCollectionPath(coll);
synchronized (getUpdateLock()){
cmdExecutor.ensureExists(fullpath, zkClient);
log.info("Updating collection state at {} from ZooKeeper... ",fullpath);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
// session events are not change events,
// and do not remove the watcher
if (EventType.None.equals(event.getType())) {
return;
}
log.info("A cluster state change: {}, has occurred - updating... ", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
try {
// delayed approach
// ZkStateReader.this.updateClusterState(false, false);
synchronized (ZkStateReader.this.getUpdateLock()) {
if(!watchedCollections.contains(coll)) {
log.info("Unwatched collection {}",coll);
return;
}
// remake watch
final Watcher thisWatch = this;
Stat stat = new Stat();
byte[] data = zkClient.getData(fullpath, thisWatch, stat, true);
if(data == null || data.length ==0){
log.warn("No value set for collection state : {}", coll);
return;
}
ClusterState clusterState = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(),ZkStateReader.this,fullpath);
// update volatile
DocCollection newState = clusterState.getCollectionStates().get(coll);
watchedCollectionStates.put(coll, newState);
log.info("Updating data for {} to ver {} ", coll , newState.getZNodeVersion());
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("Unwatched collection :"+coll , e);
throw new ZooKeeperException(ErrorCode.SERVER_ERROR,
"", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Unwatched collection :"+coll , e);
return;
}
}
};
zkClient.exists(fullpath, watcher, true);
}
watchedCollectionStates.put(coll, getCollectionLive(this, coll));
}
/**This is not a public API. Only used by ZkController */
public void removeZKWatch(final String coll){
synchronized (this){
watchedCollections.remove(coll);
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.solr.client.solrj.impl;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@ -41,6 +42,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@ -49,6 +51,7 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
@ -59,6 +62,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -121,7 +125,6 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase {
@Override
public void doTest() throws Exception {
allTests();
stateVersionParamTest();
}
private void allTests() throws Exception {
@ -344,76 +347,6 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase {
indexDoc(doc);
}
private void stateVersionParamTest() throws Exception {
CloudSolrServer client = createCloudClient(null);
try {
String collectionName = "checkStateVerCol";
createCollection(collectionName, client, 2, 2);
waitForRecoveriesToFinish(collectionName, false);
DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName);
Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
HttpSolrServer httpSolrServer = new HttpSolrServer(r.getStr(ZkStateReader.BASE_URL_PROP) + "/"+collectionName);
SolrQuery q = new SolrQuery().setQuery("*:*");
log.info("should work query, result {}", httpSolrServer.query(q));
//no problem
q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
log.info("2nd query , result {}", httpSolrServer.query(q));
//no error yet good
q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+ (coll.getZNodeVersion() -1)); //an older version expect error
HttpSolrServer.RemoteSolrException sse = null;
try {
httpSolrServer.query(q);
log.info("expected query error");
} catch (HttpSolrServer.RemoteSolrException e) {
sse = e;
}
httpSolrServer.shutdown();
assertNotNull(sse);
assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
//now send the request to another node that does n ot serve the collection
Set<String> allNodesOfColl = new HashSet<>();
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
}
}
String theNode = null;
for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) {
String n = client.getZkStateReader().getBaseUrlForNodeName(s);
if(!allNodesOfColl.contains(s)){
theNode = n;
break;
}
}
log.info("thenode which does not serve this collection{} ",theNode);
assertNotNull(theNode);
httpSolrServer = new HttpSolrServer(theNode + "/"+collectionName);
q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
try {
httpSolrServer.query(q);
log.info("error was expected");
} catch (HttpSolrServer.RemoteSolrException e) {
sse = e;
}
httpSolrServer.shutdown();
assertNotNull(sse);
assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
} finally {
client.shutdown();
}
}
public void testShutdown() throws MalformedURLException {
CloudSolrServer server = new CloudSolrServer("[ff01::114]:33332");
try {

View File

@ -339,19 +339,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
return createJettys(numJettys, false);
}
protected int defaultStateFormat = 1 + random().nextInt(2);
protected int getStateFormat() {
String stateFormat = System.getProperty("tests.solr.stateFormat", null);
if (stateFormat != null) {
if ("2".equals(stateFormat)) {
return defaultStateFormat = 2;
} else if ("1".equals(stateFormat)) {
return defaultStateFormat = 1;
}
}
return defaultStateFormat; // random
}
/**
* @param checkCreatedVsState
@ -364,17 +351,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
List<SolrServer> clients = new ArrayList<>();
StringBuilder sb = new StringBuilder();
if(getStateFormat() == 2) {
log.info("Creating collection1 with stateFormat=2");
SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(ZkNodeProps.makeMap(
Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.CREATECOLLECTION,
"name", DEFAULT_COLLECTION,
"numShards", String.valueOf(sliceCount),
DocCollection.STATE_FORMAT, getStateFormat())));
zkClient.close();
}
for (int i = 1; i <= numJettys; i++) {
if (sb.length() > 0) sb.append(',');
int cnt = this.jettyIntCntr.incrementAndGet();
@ -1514,10 +1490,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
collectionInfos.put(collectionName, list);
}
params.set("name", collectionName);
if (getStateFormat() == 2) {
log.info("Creating collection with stateFormat=2: " + collectionName);
params.set(DocCollection.STATE_FORMAT, "2");
}
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");