SOLR-6629: Watch /collections zk node on all nodes

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1697562 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2015-08-25 05:56:35 +00:00
parent bdbf4cea57
commit 99a31e17d0
9 changed files with 189 additions and 113 deletions

View File

@ -161,6 +161,10 @@ Optimizations
message processing performance by ~470%.
(Noble Paul, Scott Blum, shalin)
* SOLR-6629: Watch /collections zk node on all nodes so that cluster state updates
are more efficient especially when cluster has a mix of collections in stateFormat=1
and stateFormat=2. (Scott Blum, shalin)
Other Changes
----------------------

View File

@ -2006,7 +2006,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
boolean created = false;
while (! waitUntil.hasTimedOut()) {
Thread.sleep(100);
created = zkStateReader.getClusterState().getCollections().contains(collectionName);
created = zkStateReader.getClusterState().hasCollection(collectionName);
if(created) break;
}
if (!created)

View File

@ -633,8 +633,9 @@ public final class ZkController {
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH,"{}".getBytes(StandardCharsets.UTF_8),CreateMode.PERSISTENT, zkClient);
byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8);
cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
}
private void init(CurrentCoreDescriptorProvider registerOnReconnect) {

View File

@ -125,14 +125,24 @@ public class ZkStateWriter {
callback.onEnqueue();
}
/*
We need to know if the collection has moved from stateFormat=1 to stateFormat=2 (as a result of MIGRATECLUSTERSTATE)
*/
DocCollection previousCollection = prevState.getCollectionOrNull(cmd.name);
boolean wasPreviouslyStateFormat1 = previousCollection != null && previousCollection.getStateFormat() == 1;
boolean isCurrentlyStateFormat1 = cmd.collection != null && cmd.collection.getStateFormat() == 1;
if (cmd.collection == null) {
isClusterStateModified = true;
if (wasPreviouslyStateFormat1) {
isClusterStateModified = true;
}
clusterState = prevState.copyWith(cmd.name, null);
updates.put(cmd.name, null);
} else {
if (cmd.collection.getStateFormat() > 1) {
if (!isCurrentlyStateFormat1) {
updates.put(cmd.name, cmd.collection);
} else {
}
if (isCurrentlyStateFormat1 || wasPreviouslyStateFormat1) {
isClusterStateModified = true;
}
clusterState = prevState.copyWith(cmd.name, cmd.collection);
@ -211,6 +221,7 @@ public class ZkStateWriter {
if (c == null) {
// let's clean up the collections path for this collection
log.info("going to delete_collection {}", path);
reader.getZkClient().clean("/collections/" + name);
} else if (c.getStateFormat() > 1) {
byte[] data = Utils.toJSON(singletonMap(c.getName(), c));
@ -225,7 +236,6 @@ public class ZkStateWriter {
reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path);
clusterState = clusterState.copyWith(name, newCollection);
isClusterStateModified = true;
}
} else if (c.getStateFormat() == 1) {
isClusterStateModified = true;
@ -237,7 +247,6 @@ public class ZkStateWriter {
if (isClusterStateModified) {
assert clusterState.getZkClusterStateVersion() >= 0;
lastUpdatedTime = System.nanoTime();
byte[] data = Utils.toJSON(clusterState);
Stat stat = reader.getZkClient().setData(ZkStateReader.CLUSTER_STATE, data, clusterState.getZkClusterStateVersion(), true);
Set<String> collectionNames = clusterState.getCollections();
@ -249,6 +258,7 @@ public class ZkStateWriter {
clusterState = new ClusterState(stat.getVersion(), reader.getClusterState().getLiveNodes(), collectionStates);
isClusterStateModified = false;
}
lastUpdatedTime = System.nanoTime();
success = true;
} catch (KeeperException.BadVersionException bve) {
// this is a tragic error, we must disallow usage of this instance

View File

@ -680,7 +680,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
while (version == getClusterStateVersion(zkClient));
Thread.sleep(500);
assertFalse("collection1 should be gone after publishing the null state",
reader.getClusterState().getCollections().contains(collection));
reader.getClusterState().hasCollection(collection));
} finally {
close(mockController);
close(overseerClient);

View File

@ -39,15 +39,25 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
/** Uses explicit refresh to ensure latest changes are visible. */
public void testStateFormatUpdateWithExplicitRefresh() throws Exception {
testStateFormatUpdate(true);
testStateFormatUpdate(true, true);
}
/** Uses explicit refresh to ensure latest changes are visible. */
public void testStateFormatUpdateWithExplicitRefreshLazy() throws Exception {
testStateFormatUpdate(true, false);
}
/** ZkStateReader should automatically pick up changes based on ZK watches. */
public void testStateFormatUpdateWithTimeDelay() throws Exception {
testStateFormatUpdate(false);
testStateFormatUpdate(false, true);
}
public void testStateFormatUpdate(boolean explicitRefresh) throws Exception {
/** ZkStateReader should automatically pick up changes based on ZK watches. */
public void testStateFormatUpdateWithTimeDelayLazy() throws Exception {
testStateFormatUpdate(false, false);
}
public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting) throws Exception {
String zkDir = createTempDir("testStateFormatUpdate").toFile().getAbsolutePath();
ZkTestServer server = new ZkTestServer(zkDir);
@ -64,7 +74,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
int trackedStateVersion = reader.getClusterState().getZkClusterStateVersion();
if (isInteresting) {
reader.addCollectionWatch("c1");
}
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
@ -82,7 +94,16 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
assertFalse(exists);
trackedStateVersion = refresh(reader, trackedStateVersion, explicitRefresh);
if (explicitRefresh) {
reader.updateClusterState();
} else {
for (int i = 0; i < 100; ++i) {
if (reader.getClusterState().hasCollection("c1")) {
break;
}
Thread.sleep(50);
}
}
DocCollection collection = reader.getClusterState().getCollection("c1");
assertEquals(1, collection.getStateFormat());
@ -101,7 +122,16 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
assertTrue(exists);
trackedStateVersion = refresh(reader, trackedStateVersion, explicitRefresh);
if (explicitRefresh) {
reader.updateClusterState();
} else {
for (int i = 0; i < 100; ++i) {
if (reader.getClusterState().getCollection("c1").getStateFormat() == 2) {
break;
}
Thread.sleep(50);
}
}
DocCollection collection = reader.getClusterState().getCollection("c1");
assertEquals(2, collection.getStateFormat());
@ -138,7 +168,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
writer.enqueueUpdate(reader.getClusterState(), c1, null);
writer.writePendingUpdates();
refresh(reader, 0, true);
reader.updateClusterState();
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
reader.addCollectionWatch("c1");
@ -151,20 +181,4 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
server.shutdown();
}
}
private static int refresh(ZkStateReader reader, int trackedStateVersion, boolean explicitRefresh) throws KeeperException, InterruptedException {
if (explicitRefresh) {
reader.updateClusterState();
return reader.getClusterState().getZkClusterStateVersion();
}
for (int i = 0; i < 100; ++i) {
// Loop until we observe the change.
int newStateVersion = reader.getClusterState().getZkClusterStateVersion();
if (newStateVersion > trackedStateVersion) {
return newStateVersion;
}
Thread.sleep(100);
}
throw new AssertionError("Did not observe expected update");
}
}

View File

@ -1094,7 +1094,7 @@ public class CloudSolrClient extends SolrClient {
Set<String> collectionNames = new HashSet<>();
// validate collections
for (String collectionName : rawCollectionsList) {
if (!clusterState.getCollections().contains(collectionName)) {
if (!clusterState.hasCollection(collectionName)) {
Aliases aliases = zkStateReader.getAliases();
String alias = aliases.getCollectionAlias(collectionName);
if (alias != null) {

View File

@ -17,13 +17,6 @@ package org.apache.solr.common.cloud;
* limitations under the License.
*/
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.Utils;
import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -33,6 +26,14 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.Watcher;
import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Immutable state of the cloud. Normally you can get the state by using
* {@link ZkStateReader#getClusterState()}.
@ -110,8 +111,16 @@ public class ClusterState implements JSONWriter.Writable {
return null;
}
public boolean hasCollection(String coll) {
return collectionStates.containsKey(coll) ;
/**
* Returns true if the specified collection name exists, false otherwise.
*
* Implementation note: This method resolves the collection reference by calling
* {@link CollectionRef#get()} which can make a call to ZooKeeper. This is necessary
* because the semantics of how collection list is loaded have changed in SOLR-6629.
* Please javadocs in {@link ZkStateReader#refreshCollectionList(Watcher)}
*/
public boolean hasCollection(String collectionName) {
return getCollectionOrNull(collectionName) != null;
}
/**
@ -170,19 +179,38 @@ public class ClusterState implements JSONWriter.Writable {
return collectionStates.get(coll);
}
public DocCollection getCollectionOrNull(String coll) {
CollectionRef ref = collectionStates.get(coll);
return ref == null? null:ref.get();
/**
* Returns the corresponding {@link DocCollection} object for the given collection name
* if such a collection exists. Returns null otherwise.
*
* Implementation note: This method resolves the collection reference by calling
* {@link CollectionRef#get()} which can make a call to ZooKeeper. This is necessary
* because the semantics of how collection list is loaded have changed in SOLR-6629.
* Please javadocs in {@link ZkStateReader#refreshCollectionList(Watcher)}
*/
public DocCollection getCollectionOrNull(String collectionName) {
CollectionRef ref = collectionStates.get(collectionName);
return ref == null ? null : ref.get();
}
/**
* Get collection names.
*
* Implementation note: This method resolves the collection reference by calling
* {@link CollectionRef#get()} which can make a call to ZooKeeper. This is necessary
* because the semantics of how collection list is loaded have changed in SOLR-6629.
* Please javadocs in {@link ZkStateReader#refreshCollectionList(Watcher)}
*/
public Set<String> getCollections() {
return collectionStates.keySet();
Set<String> result = new HashSet<>();
for (Entry<String, CollectionRef> entry : collectionStates.entrySet()) {
if (entry.getValue().get() != null) {
result.add(entry.getKey());
}
}
return result;
}
/**
* Get names of the currently live nodes.
*/

View File

@ -23,7 +23,6 @@ import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@ -121,7 +120,7 @@ public class ZkStateReader implements Closeable {
private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
/** Collections with format2 state.json, not "interesting" and not actively watched. */
private volatile Map<String, ClusterState.CollectionRef> lazyCollectionStates = new HashMap<>();
private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<String, LazyCollectionRef>();
private volatile Set<String> liveNodes = emptySet();
@ -261,7 +260,7 @@ public class ZkStateReader implements Closeable {
DocCollection newState = fetchCollectionState(coll, null);
updateWatchedCollection(coll, newState);
}
refreshLazyFormat2Collections(true);
refreshCollectionList(null);
refreshLiveNodes(null);
constructState();
}
@ -313,7 +312,7 @@ public class ZkStateReader implements Closeable {
// on reconnect of SolrZkClient force refresh and re-add watches.
refreshLegacyClusterState(new LegacyClusterStateWatcher());
refreshStateFormat2Collections();
refreshLazyFormat2Collections(true);
refreshCollectionList(new CollectionsChildWatcher());
refreshLiveNodes(new LiveNodeWatcher());
synchronized (ZkStateReader.this.getUpdateLock()) {
@ -448,7 +447,7 @@ public class ZkStateReader implements Closeable {
}
// Finally, add any lazy collections that aren't already accounted for.
for (Map.Entry<String, ClusterState.CollectionRef> entry : lazyCollectionStates.entrySet()) {
for (Map.Entry<String, LazyCollectionRef> entry : lazyCollectionStates.entrySet()) {
result.putIfAbsent(entry.getKey(), entry.getValue());
}
@ -496,78 +495,69 @@ public class ZkStateReader implements Closeable {
/**
* Search for any lazy-loadable state format2 collections.
*
* A stateFormat=1 collection which is not interesting to us can also
* be put into the {@link #lazyCollectionStates} map here. But that is okay
* because {@link #constructState()} will give priority to collections in the
* shared collection state over this map.
* In fact this is a clever way to avoid doing a ZK exists check on
* the /collections/collection_name/state.json znode
* Such an exists check is done in {@link ClusterState#hasCollection(String)} and
* {@link ClusterState#getCollections()} method as a safeguard against exposing wrong collection names to the users
*/
private void refreshLazyFormat2Collections(boolean fullRefresh) throws KeeperException, InterruptedException {
private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
List<String> children = null;
try {
children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
} catch (KeeperException.NoNodeException e) {
log.warn("Error fetching collection names");
// fall through
}
if (children == null || children.isEmpty()) {
synchronized (getUpdateLock()) {
this.lazyCollectionStates = new HashMap<>();
}
lazyCollectionStates.clear();
return;
}
Map<String, ClusterState.CollectionRef> result = new HashMap<>();
for (String collName : children) {
if (interestingCollections.contains(collName)) {
// We will create an eager collection for any interesting collections.
continue;
}
// Don't mess with watchedCollections, they should self-manage.
if (!fullRefresh) {
// Try to use an already-created lazy collection if it's not a full refresh.
ClusterState.CollectionRef existing = lazyCollectionStates.get(collName);
if (existing != null) {
result.put(collName, existing);
continue;
// First, drop any children that disappeared.
this.lazyCollectionStates.keySet().retainAll(children);
for (String coll : children) {
// We will create an eager collection for any interesting collections, so don't add to lazy.
if (!interestingCollections.contains(coll)) {
// Double check contains just to avoid allocating an object.
LazyCollectionRef existing = lazyCollectionStates.get(coll);
if (existing == null) {
lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
}
}
ClusterState.CollectionRef lazyCollectionState = tryMakeLazyCollectionStateFormat2(collName);
if (lazyCollectionState != null) {
result.put(collName, lazyCollectionState);
}
}
synchronized (getUpdateLock()) {
this.lazyCollectionStates = result;
}
}
private ClusterState.CollectionRef tryMakeLazyCollectionStateFormat2(final String collName) {
boolean exists = false;
try {
exists = zkClient.exists(getCollectionPath(collName), true);
} catch (Exception e) {
log.warn("Error reading collections nodes", e);
}
if (!exists) {
return null;
private class LazyCollectionRef extends ClusterState.CollectionRef {
private final String collName;
public LazyCollectionRef(String collName) {
super(null);
this.collName = collName;
}
// if it is not collection, then just create a reference which can fetch
// the collection object just in time from ZK
return new ClusterState.CollectionRef(null) {
@Override
public DocCollection get() {
return getCollectionLive(ZkStateReader.this, collName);
}
@Override
public DocCollection get() {
// TODO: consider limited caching
return getCollectionLive(ZkStateReader.this, collName);
}
@Override
public boolean isLazilyLoaded() {
return true;
}
@Override
public boolean isLazilyLoaded() {
return true;
}
@Override
public String toString() {
return "lazy DocCollection(" + collName + ")";
}
};
@Override
public String toString() {
return "LazyCollectionRef(" + collName + ")";
}
}
/**
@ -926,9 +916,6 @@ public class ZkStateReader implements Closeable {
public void refreshAndWatch() {
try {
refreshLegacyClusterState(this);
// Changes to clusterstate.json signal global state changes.
// TODO: get rid of clusterstate.json as a signaling mechanism.
refreshLazyFormat2Collections(false);
} catch (KeeperException.NoNodeException e) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
@ -949,6 +936,44 @@ public class ZkStateReader implements Closeable {
}
}
/** Watches /collections children . */
class CollectionsChildWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
// session events are not change events,
// and do not remove the watcher
if (EventType.None.equals(event.getType())) {
return;
}
log.info("A collections change: {}, has occurred - updating...", (event));
refreshAndWatch();
synchronized (getUpdateLock()) {
constructState();
}
}
/** Must hold {@link #getUpdateLock()} before calling this method. */
public void refreshAndWatch() {
try {
refreshCollectionList(this);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
}
}
}
/** Watches the live_nodes and syncs changes. */
class LiveNodeWatcher implements Watcher {
@ -963,7 +988,6 @@ public class ZkStateReader implements Closeable {
refreshAndWatch();
}
/** Must hold {@link #getUpdateLock()} before calling this method. */
public void refreshAndWatch() {
try {
refreshLiveNodes(this);
@ -1068,13 +1092,8 @@ public class ZkStateReader implements Closeable {
log.info("Removing watch for uninteresting collection {}", coll);
interestingCollections.remove(coll);
watchedCollectionStates.remove(coll);
ClusterState.CollectionRef lazyCollectionStateFormat2 = tryMakeLazyCollectionStateFormat2(coll);
lazyCollectionStates.put(coll, new LazyCollectionRef(coll));
synchronized (getUpdateLock()) {
if (lazyCollectionStateFormat2 != null) {
this.lazyCollectionStates.put(coll, lazyCollectionStateFormat2);
} else {
this.lazyCollectionStates.remove(coll);
}
constructState();
}
}