SOLR-8323: Add CollectionStateWatcher API

This commit is contained in:
Alan Woodward 2016-04-14 16:21:35 +01:00
parent e94ffde44e
commit 06d2f6368d
13 changed files with 716 additions and 92 deletions

View File

@ -111,6 +111,8 @@ New Features
* SOLR-8208: [subquery] document transformer executes separate requests per result document. (Cao Manh Dat via Mikhail Khludnev)
* SOLR-8323: All CollectionStateWatcher API (Alan Woodward, Scott Blum)
Bug Fixes
----------------------

View File

@ -1215,23 +1215,10 @@ public final class ZkController {
if (context != null) {
context.cancelElection();
}
final Collection<SolrCore> cores = cc.getCores();
// if there is no SolrCore which is a member of this collection, remove the watch
CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
boolean removeWatch = true;
for (SolrCore solrCore : cores) {
final CloudDescriptor cloudDesc = solrCore.getCoreDescriptor().getCloudDescriptor();
if (cloudDesc != null && cloudDescriptor.getCollectionName().equals(cloudDesc.getCollectionName())) {
removeWatch = false;
break;
}
}
if (removeWatch) {
zkStateReader.removeZKWatch(collection);
}
zkStateReader.unregisterCore(cloudDescriptor.getCollectionName());
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, getNodeName(),
@ -1481,7 +1468,7 @@ public final class ZkController {
"Collection {} not visible yet, but flagging it so a watch is registered when it becomes visible" :
"Registering watch for collection {}",
collectionName);
zkStateReader.addCollectionWatch(collectionName);
zkStateReader.registerCore(collectionName);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);

View File

@ -63,6 +63,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
ZkStateReader reader = null;
try {
server.run();
@ -72,10 +73,10 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);
ZkStateReader reader = new ZkStateReader(zkClient);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
if (isInteresting) {
reader.addCollectionWatch("c1");
reader.registerCore("c1");
}
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
@ -137,7 +138,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
assertEquals(2, collection.getStateFormat());
}
} finally {
IOUtils.close(zkClient);
IOUtils.close(reader, zkClient);
server.shutdown();
}
@ -147,6 +148,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
String zkDir = createTempDir("testExternalCollectionWatchedNotWatched").toFile().getAbsolutePath();
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
ZkStateReader reader = null;
try {
server.run();
@ -156,7 +158,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);
ZkStateReader reader = new ZkStateReader(zkClient);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
@ -171,13 +173,13 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
reader.forceUpdateCollection("c1");
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
reader.addCollectionWatch("c1");
reader.registerCore("c1");
assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
reader.removeZKWatch("c1");
reader.unregisterCore("c1");
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
} finally {
IOUtils.close(zkClient);
IOUtils.close(reader, zkClient);
server.shutdown();
}
}
@ -188,6 +190,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
ZkStateReader reader = null;
try {
server.run();
@ -197,9 +200,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);
ZkStateReader reader = new ZkStateReader(zkClient);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
reader.addCollectionWatch("c1");
reader.registerCore("c1");
// Initially there should be no c1 collection.
assertNull(reader.getClusterState().getCollectionRef("c1"));
@ -235,7 +238,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
assertFalse(ref.isLazilyLoaded());
assertEquals(2, ref.get().getStateFormat());
} finally {
IOUtils.close(zkClient);
IOUtils.close(reader, zkClient);
server.shutdown();
}

View File

@ -56,6 +56,8 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.ToleratedUpdateError;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
@ -572,6 +574,40 @@ public class CloudSolrClient extends SolrClient {
zkStateReader.getConfigManager().downloadConfigDir(configName, downloadPath);
}
/**
* Block until a collection state matches a predicate, or a timeout
*
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
*
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate a {@link CollectionStatePredicate} to check the collection state
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
*/
public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
throws InterruptedException, TimeoutException {
connect();
zkStateReader.waitForState(collection, wait, unit, predicate);
}
/**
* Register a CollectionStateWatcher to be called when the cluster state for a collection changes
*
* Note that the watcher is unregistered after it has been called once. To make a watcher persistent,
* it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
* call
*
* @param collection the collection to watch
* @param watcher a watcher that will be called when the state changes
*/
public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
connect();
zkStateReader.registerCollectionStateWatcher(collection, watcher);
}
private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection, ClusterState clusterState) throws SolrServerException {
UpdateRequest updateRequest = (UpdateRequest) request;
ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();

View File

@ -0,0 +1,42 @@
package org.apache.solr.common.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Interface to determine if a collection state matches a required state
*
* @see ZkStateReader#waitForState(String, long, TimeUnit, CollectionStatePredicate)
*/
public interface CollectionStatePredicate {
/**
* Check the collection state matches a required state
*
* Note that both liveNodes and collectionState should be consulted to determine
* the overall state.
*
* @param liveNodes the current set of live nodes
* @param collectionState the latest collection state, or null if the collection
* does not exist
*/
boolean matches(Set<String> liveNodes, DocCollection collectionState);
}

View File

@ -0,0 +1,42 @@
package org.apache.solr.common.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Set;
/**
* Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)}
* and called whenever the collection state changes.
*/
public interface CollectionStateWatcher {
/**
* Called when the collection we are registered against has a change of state
*
* Note that, due to the way Zookeeper watchers are implemented, a single call may be
* the result of several state changes
*
* A watcher is unregistered after it has been called once. To make a watcher persistent,
* implementors should re-register during this call.
*
* @param liveNodes the set of live nodes
* @param collectionState the new collection state
*/
void onStateChanged(Set<String> liveNodes, DocCollection collectionState);
}

View File

@ -22,6 +22,8 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@ -35,7 +37,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
/**
* Models a Collection in zookeeper (but that Java name is obviously taken, hence "DocCollection")
*/
public class DocCollection extends ZkNodeProps {
public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
public static final String DOC_ROUTER = "router";
public static final String SHARDS = "shards";
public static final String STATE_FORMAT = "stateFormat";
@ -217,4 +220,27 @@ public class DocCollection extends ZkNodeProps {
if (slice == null) return null;
return slice.getLeader();
}
/**
* Check that all replicas in a collection are live
*
* @see CollectionStatePredicate
*/
public static boolean isFullyActive(Set<String> liveNodes, DocCollection collectionState) {
Objects.requireNonNull(liveNodes);
if (collectionState == null)
return false;
for (Slice slice : collectionState) {
for (Replica replica : slice) {
if (replica.isActive(liveNodes) == false)
return false;
}
}
return true;
}
@Override
public Iterator<Slice> iterator() {
return slices.values().iterator();
}
}

View File

@ -16,13 +16,15 @@
*/
package org.apache.solr.common.cloud;
import static org.apache.solr.common.cloud.ZkStateReader.*;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.noggit.JSONUtil;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
public class Replica extends ZkNodeProps {
/**
@ -116,6 +118,10 @@ public class Replica extends ZkNodeProps {
return state;
}
public boolean isActive(Set<String> liveNodes) {
return liveNodes.contains(this.nodeName) && this.state == State.ACTIVE;
}
@Override
public String toString() {
return name + ':' + JSONUtil.toJSON(propMap, -1); // small enough, keep it on one line (i.e. no indent)

View File

@ -19,6 +19,7 @@ package org.apache.solr.common.cloud;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
@ -29,24 +30,29 @@ import org.noggit.JSONWriter;
/**
* A Slice contains immutable information about a logical shard (all replicas that share the same shard id).
*/
public class Slice extends ZkNodeProps {
public class Slice extends ZkNodeProps implements Iterable<Replica> {
/** Loads multiple slices into a Map from a generic Map that probably came from deserialized JSON. */
public static Map<String,Slice> loadAllFromMap(Map<String, Object> genericSlices) {
if (genericSlices == null) return Collections.emptyMap();
Map<String,Slice> result = new LinkedHashMap<>(genericSlices.size());
for (Map.Entry<String,Object> entry : genericSlices.entrySet()) {
Map<String, Slice> result = new LinkedHashMap<>(genericSlices.size());
for (Map.Entry<String, Object> entry : genericSlices.entrySet()) {
String name = entry.getKey();
Object val = entry.getValue();
if (val instanceof Slice) {
result.put(name, (Slice)val);
result.put(name, (Slice) val);
} else if (val instanceof Map) {
result.put(name, new Slice(name, null, (Map<String,Object>)val));
result.put(name, new Slice(name, null, (Map<String, Object>) val));
}
}
return result;
}
@Override
public Iterator<Replica> iterator() {
return replicas.values().iterator();
}
/** The slice's state. */
public enum State {

View File

@ -28,15 +28,22 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.common.Callable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
@ -111,12 +118,10 @@ public class ZkStateReader implements Closeable {
public static final String SHARD_LEADERS_ZKNODE = "leaders";
public static final String ELECTION_NODE = "election";
/** Collections we actively care about, and will try to keep watch on. */
private final Set<String> interestingCollections = Collections.newSetFromMap(new ConcurrentHashMap<>());
/** Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json. */
private Map<String, ClusterState.CollectionRef> legacyCollectionStates = emptyMap();
/** Last seen ZK version of clusterstate.json. */
private int legacyClusterStateVersion = 0;
@ -134,6 +139,21 @@ public class ZkStateReader implements Closeable {
private final Runnable securityNodeListener;
private ConcurrentHashMap<String, CollectionWatch> collectionWatches = new ConcurrentHashMap<>();
private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
private class CollectionWatch {
int coreRefCount = 0;
Set<CollectionStateWatcher> stateWatchers = new HashSet<>();
public boolean canBeRemoved() {
return coreRefCount + stateWatchers.size() == 0;
}
}
public static final Set<String> KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList(
LEGACY_CLOUD,
URL_SCHEME,
@ -262,6 +282,7 @@ public class ZkStateReader implements Closeable {
* a better design is possible.
*/
public void forceUpdateCollection(String collection) throws KeeperException, InterruptedException {
synchronized (getUpdateLock()) {
if (clusterState == null) {
return;
@ -295,6 +316,7 @@ public class ZkStateReader implements Closeable {
}
constructState();
}
}
/** Refresh the set of live nodes. */
@ -341,10 +363,10 @@ public class ZkStateReader implements Closeable {
}
// on reconnect of SolrZkClient force refresh and re-add watches.
refreshLiveNodes(new LiveNodeWatcher());
refreshLegacyClusterState(new LegacyClusterStateWatcher());
refreshStateFormat2Collections();
refreshCollectionList(new CollectionsChildWatcher());
refreshLiveNodes(new LiveNodeWatcher());
synchronized (ZkStateReader.this.getUpdateLock()) {
constructState();
@ -458,7 +480,7 @@ public class ZkStateReader implements Closeable {
this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);
LOG.debug("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]",
legacyCollectionStates.keySet().size(),
interestingCollections.size(),
collectionWatches.keySet().size(),
watchedCollectionStates.keySet().size(),
lazyCollectionStates.keySet().size(),
clusterState.getCollectionStates().size());
@ -466,7 +488,7 @@ public class ZkStateReader implements Closeable {
if (LOG.isTraceEnabled()) {
LOG.trace("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]",
legacyCollectionStates.keySet(),
interestingCollections,
collectionWatches.keySet(),
watchedCollectionStates.keySet(),
lazyCollectionStates.keySet(),
clusterState.getCollectionStates());
@ -476,8 +498,7 @@ public class ZkStateReader implements Closeable {
/**
* Refresh legacy (shared) clusterstate.json
*/
private void refreshLegacyClusterState(Watcher watcher)
throws KeeperException, InterruptedException {
private void refreshLegacyClusterState(Watcher watcher) throws KeeperException, InterruptedException {
try {
final Stat stat = new Stat();
final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
@ -487,6 +508,20 @@ public class ZkStateReader implements Closeable {
// Nothing to do, someone else updated same or newer.
return;
}
Set<String> liveNodes = this.liveNodes; // volatile read
for (Map.Entry<String, CollectionWatch> watchEntry : this.collectionWatches.entrySet()) {
String coll = watchEntry.getKey();
CollectionWatch collWatch = watchEntry.getValue();
ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll);
if (ref == null)
continue;
// legacy collections are always in-memory
DocCollection newState = ref.get();
if (!collWatch.stateWatchers.isEmpty()
&& !Objects.equals(loadedData.getCollectionStates().get(coll).get(), newState)) {
notifyStateWatchers(liveNodes, coll, newState);
}
}
this.legacyCollectionStates = loadedData.getCollectionStates();
this.legacyClusterStateVersion = stat.getVersion();
}
@ -503,9 +538,8 @@ public class ZkStateReader implements Closeable {
* Refresh state format2 collections.
*/
private void refreshStateFormat2Collections() {
// It's okay if no format2 state.json exists, if one did not previous exist.
for (String coll : interestingCollections) {
new StateWatcher(coll).refreshAndWatch(watchedCollectionStates.containsKey(coll));
for (String coll : collectionWatches.keySet()) {
new StateWatcher(coll).refreshAndWatch();
}
}
@ -546,7 +580,7 @@ public class ZkStateReader implements Closeable {
this.lazyCollectionStates.keySet().retainAll(children);
for (String coll : children) {
// We will create an eager collection for any interesting collections, so don't add to lazy.
if (!interestingCollections.contains(coll)) {
if (!collectionWatches.containsKey(coll)) {
// Double check contains just to avoid allocating an object.
LazyCollectionRef existing = lazyCollectionStates.get(coll);
if (existing == null) {
@ -637,6 +671,7 @@ public class ZkStateReader implements Closeable {
public void close() {
this.closed = true;
notifications.shutdown();
if (closeClient) {
zkClient.close();
}
@ -888,7 +923,7 @@ public class ZkStateReader implements Closeable {
return;
}
if (!interestingCollections.contains(coll)) {
if (!collectionWatches.containsKey(coll)) {
// This collection is no longer interesting, stop watching.
LOG.info("Uninteresting collection {}", coll);
return;
@ -899,27 +934,22 @@ public class ZkStateReader implements Closeable {
LOG.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])",
event, coll, liveNodesSize);
refreshAndWatch(true);
refreshAndWatch();
synchronized (getUpdateLock()) {
constructState();
}
}
/**
* Refresh collection state from ZK and leave a watch for future changes.
* As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates}
* with the results of the refresh.
*
* @param expectExists if true, error if no state node exists
*/
public void refreshAndWatch(boolean expectExists) {
public void refreshAndWatch() {
try {
DocCollection newState = fetchCollectionState(coll, this);
updateWatchedCollection(coll, newState);
} catch (KeeperException.NoNodeException e) {
if (expectExists) {
LOG.warn("State node vanished for collection: [{}]", coll, e);
}
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
} catch (KeeperException e) {
@ -1071,32 +1101,190 @@ public class ZkStateReader implements Closeable {
return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
}
public void addCollectionWatch(String coll) {
if (interestingCollections.add(coll)) {
LOG.info("addZkWatch [{}]", coll);
new StateWatcher(coll).refreshAndWatch(false);
/**
* Notify this reader that a local Core is a member of a collection, and so that collection
* state should be watched.
*
* Not a public API. This method should only be called from ZkController.
*
* The number of cores per-collection is tracked, and adding multiple cores from the same
* collection does not increase the number of watches.
*
* @param collection the collection that the core is a member of
*
* @see ZkStateReader#unregisterCore(String)
*/
public void registerCore(String collection) {
AtomicBoolean reconstructState = new AtomicBoolean(false);
collectionWatches.compute(collection, (k, v) -> {
if (v == null) {
reconstructState.set(true);
v = new CollectionWatch();
}
v.coreRefCount++;
return v;
});
if (reconstructState.get()) {
new StateWatcher(collection).refreshAndWatch();
synchronized (getUpdateLock()) {
constructState();
}
}
}
/**
* Notify this reader that a local core that is a member of a collection has been closed.
*
* Not a public API. This method should only be called from ZkController.
*
* If no cores are registered for a collection, and there are no {@link CollectionStateWatcher}s
* for that collection either, the collection watch will be removed.
*
* @param collection the collection that the core belongs to
*/
public void unregisterCore(String collection) {
AtomicBoolean reconstructState = new AtomicBoolean(false);
collectionWatches.compute(collection, (k, v) -> {
if (v == null)
return null;
if (v.coreRefCount > 0)
v.coreRefCount--;
if (v.canBeRemoved()) {
watchedCollectionStates.remove(collection);
lazyCollectionStates.put(collection, new LazyCollectionRef(collection));
reconstructState.set(true);
return null;
}
return v;
});
if (reconstructState.get()) {
synchronized (getUpdateLock()) {
constructState();
}
}
}
/**
* Register a CollectionStateWatcher to be called when the state of a collection changes
*
* A given CollectionStateWatcher will be only called once. If you want to have a persistent watcher,
* it should register itself again in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
* method.
*/
public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false);
collectionWatches.compute(collection, (k, v) -> {
if (v == null) {
v = new CollectionWatch();
watchSet.set(true);
}
v.stateWatchers.add(stateWatcher);
return v;
});
if (watchSet.get()) {
new StateWatcher(collection).refreshAndWatch();
synchronized (getUpdateLock()) {
constructState();
}
}
}
/**
* Block until a CollectionStatePredicate returns true, or the wait times out
*
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
*
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate the predicate to call on state changes
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
*/
public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
throws InterruptedException, TimeoutException {
final CountDownLatch latch = new CountDownLatch(1);
CollectionStateWatcher watcher = new CollectionStateWatcher() {
@Override
public void onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
if (predicate.matches(liveNodes, collectionState)) {
latch.countDown();
} else {
registerCollectionStateWatcher(collection, this);
}
}
};
registerCollectionStateWatcher(collection, watcher);
try {
// check the current state
DocCollection dc = clusterState.getCollectionOrNull(collection);
if (predicate.matches(liveNodes, dc))
return;
// wait for the watcher predicate to return true, or time out
if (!latch.await(wait, unit))
throw new TimeoutException();
}
finally {
removeCollectionStateWatcher(collection, watcher);
}
}
/**
* Remove a watcher from a collection's watch list.
*
* This allows Zookeeper watches to be removed if there is no interest in the
* collection.
*
* @param collection the collection
* @param watcher the watcher
*/
public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
collectionWatches.compute(collection, (k, v) -> {
if (v == null)
return null;
v.stateWatchers.remove(watcher);
if (v.canBeRemoved())
return null;
return v;
});
}
/* package-private for testing */
Set<CollectionStateWatcher> getStateWatchers(String collection) {
CollectionWatch watch = collectionWatches.get(collection);
if (watch == null)
return null;
return new HashSet<>(watch.stateWatchers);
}
// returns true if the state has changed
private void updateWatchedCollection(String coll, DocCollection newState) {
Set<String> liveNodes = this.liveNodes; // volatile read
if (newState == null) {
LOG.info("Deleting data for [{}]", coll);
watchedCollectionStates.remove(coll);
notifyStateWatchers(liveNodes, coll, null);
return;
}
// CAS update loop
while (true) {
if (!interestingCollections.contains(coll)) {
if (!collectionWatches.containsKey(coll)) {
break;
}
DocCollection oldState = watchedCollectionStates.get(coll);
if (oldState == null) {
if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
LOG.info("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
notifyStateWatchers(liveNodes, coll, newState);
break;
}
} else {
@ -1106,27 +1294,18 @@ public class ZkStateReader implements Closeable {
}
if (watchedCollectionStates.replace(coll, oldState, newState)) {
LOG.info("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
notifyStateWatchers(liveNodes, coll, newState);
break;
}
}
}
// Resolve race with removeZKWatch.
if (!interestingCollections.contains(coll)) {
// Resolve race with unregisterCore.
if (!collectionWatches.containsKey(coll)) {
watchedCollectionStates.remove(coll);
LOG.info("Removing uninteresting collection [{}]", coll);
}
}
/** This is not a public API. Only used by ZkController */
public void removeZKWatch(String coll) {
LOG.info("Removing watch for uninteresting collection [{}]", coll);
interestingCollections.remove(coll);
watchedCollectionStates.remove(coll);
lazyCollectionStates.put(coll, new LazyCollectionRef(coll));
synchronized (getUpdateLock()) {
constructState();
}
}
public static class ConfigData {
@ -1142,4 +1321,45 @@ public class ZkStateReader implements Closeable {
}
}
private void notifyStateWatchers(Set<String> liveNodes, String collection, DocCollection collectionState) {
try {
notifications.submit(new Notification(liveNodes, collection, collectionState));
}
catch (RejectedExecutionException e) {
if (closed == false) {
LOG.error("Couldn't run collection notifications for {}", collection, e);
}
}
}
private class Notification implements Runnable {
final Set<String> liveNodes;
final String collection;
final DocCollection collectionState;
private Notification(Set<String> liveNodes, String collection, DocCollection collectionState) {
this.liveNodes = liveNodes;
this.collection = collection;
this.collectionState = collectionState;
}
@Override
public void run() {
List<CollectionStateWatcher> watchers = new ArrayList<>();
collectionWatches.compute(collection, (k, v) -> {
if (v == null)
return null;
watchers.addAll(v.stateWatchers);
v.stateWatchers.clear();
return v;
});
for (CollectionStateWatcher watcher : watchers) {
watcher.onStateChanged(liveNodes, collectionState);
}
}
}
}

View File

@ -19,16 +19,9 @@ package org.apache.solr.common.util;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
@ -153,6 +146,13 @@ public class ExecutorUtil {
threadFactory);
}
/**
* Create a cached thread pool using a named thread factory
*/
public static ExecutorService newMDCAwareCachedThreadPool(String name) {
return newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(name));
}
/**
* See {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)}
*/

View File

@ -0,0 +1,238 @@
package org.apache.solr.common.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.util.ExecutorUtil;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.core.Is.is;
public class TestCollectionStateWatchers extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int CLUSTER_SIZE = 4;
private static final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers");
private static final int MAX_WAIT_TIMEOUT = 30;
@BeforeClass
public static void startCluster() throws Exception {
configureCluster(CLUSTER_SIZE)
.addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
.configure();
}
@AfterClass
public static void shutdownBackgroundExecutors() {
executor.shutdown();
}
@Before
public void prepareCluster() throws Exception {
int missingServers = CLUSTER_SIZE - cluster.getJettySolrRunners().size();
for (int i = 0; i < missingServers; i++) {
cluster.startJettySolrRunner();
}
cluster.waitForAllNodes(30);
}
private static Future<Boolean> waitInBackground(String collection, long timeout, TimeUnit unit,
CollectionStatePredicate predicate) {
return executor.submit(() -> {
try {
cluster.getSolrClient().waitForState(collection, timeout, unit, predicate);
} catch (InterruptedException | TimeoutException e) {
return Boolean.FALSE;
}
return Boolean.TRUE;
});
}
@Test
public void testSimpleCollectionWatch() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
cluster.createCollection("testcollection", CLUSTER_SIZE, 1, "config", new HashMap<>());
client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive);
// shutdown a node and check that we get notified about the change
final AtomicInteger nodeCount = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(1);
client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> {
// we can't just count liveNodes here, because that's updated by a separate watcher,
// and it may be the case that we're triggered by a node setting itself to DOWN before
// the liveNodes watcher is called
log.info("State changed: {}", collectionState);
for (Slice slice : collectionState) {
for (Replica replica : slice) {
if (replica.isActive(liveNodes))
nodeCount.incrementAndGet();
}
}
latch.countDown();
});
cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertThat(nodeCount.intValue(), is(3));
}
@Test
public void testWaitForStateChecksCurrentState() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
cluster.createCollection("waitforstate", 1, 1, "config", new HashMap<>());
client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive);
// several goes, to check that we're not getting delayed state changes
for (int i = 0; i < 10; i++) {
try {
client.waitForState("waitforstate", 1, TimeUnit.SECONDS, DocCollection::isFullyActive);
}
catch (TimeoutException e) {
fail("waitForState should return immediately if the predicate is already satisfied");
}
}
}
@Test
public void testCanWaitForNonexistantCollection() throws Exception {
Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive);
cluster.createCollection("delayed", 1, 1, "config", new HashMap<>());
assertTrue("waitForState was not triggered by collection creation", future.get());
}
@Test
public void testPredicateFailureTimesOut() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
expectThrows(TimeoutException.class, () -> {
client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false));
});
Set<CollectionStateWatcher> watchers = client.getZkStateReader().getStateWatchers("nosuchcollection");
assertTrue("Watchers for collection should be removed after timeout",
watchers == null || watchers.size() == 0);
}
@Test
public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
cluster.createCollection("falsepredicate", 4, 1, "config", new HashMap<>());
client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive);
final CountDownLatch firstCall = new CountDownLatch(1);
// stop a node, then add a watch waiting for all nodes to be back up
JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
Future<Boolean> future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
firstCall.countDown();
return DocCollection.isFullyActive(liveNodes, collectionState);
});
// first, stop another node; the watch should not be fired after this!
JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
// now start them both back up
cluster.startJettySolrRunner(node1);
assertTrue("CollectionStateWatcher not called after 30 seconds", firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
cluster.startJettySolrRunner(node2);
Boolean result = future.get();
assertTrue("Did not see a fully active cluster after 30 seconds", result);
}
@Test
public void testWatcherIsRemovedAfterTimeout() {
CloudSolrClient client = cluster.getSolrClient();
assertTrue("There should be no watchers for a non-existent collection!",
client.getZkStateReader().getStateWatchers("no-such-collection") == null);
expectThrows(TimeoutException.class, () -> {
client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, DocCollection::isFullyActive);
});
Set<CollectionStateWatcher> watchers = client.getZkStateReader().getStateWatchers("no-such-collection");
assertTrue("Watchers for collection should be removed after timeout",
watchers == null || watchers.size() == 0);
}
@Test
public void testDeletionsTriggerWatches() throws Exception {
cluster.createCollection("tobedeleted", 1, 1, "config", new HashMap<>());
Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (l, c) -> c == null);
CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient());
assertTrue("CollectionStateWatcher not notified of delete call after 30 seconds", future.get());
}
@Test
public void testWatchesWorkForStateFormat1() throws Exception {
final CloudSolrClient client = cluster.getSolrClient();
Future<Boolean> future
= waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive);
CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation", future.get());
Future<Boolean> migrated
= waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> c != null && c.getStateFormat() == 2);
CollectionAdminRequest.migrateCollectionFormat("stateformat1").processAndWait(client, MAX_WAIT_TIMEOUT);
assertTrue("CollectionStateWatcher did not persist over state format migration", migrated.get());
}
}

View File

@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
@ -181,8 +182,8 @@ public class MiniSolrCloudCluster {
*/
public MiniSolrCloudCluster(int numServers, Path baseDir, String solrXml, JettyConfig jettyConfig, ZkTestServer zkTestServer) throws Exception {
this.baseDir = baseDir;
this.jettyConfig = jettyConfig;
this.baseDir = Objects.requireNonNull(baseDir);
this.jettyConfig = Objects.requireNonNull(jettyConfig);
Files.createDirectories(baseDir);
@ -194,8 +195,7 @@ public class MiniSolrCloudCluster {
}
this.zkServer = zkTestServer;
try(SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(),
AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT, null)) {
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT)) {
zkClient.makePath("/solr/solr.xml", solrXml.getBytes(Charset.defaultCharset()), true);
if (jettyConfig.sslConfig != null && jettyConfig.sslConfig.isSSLMode()) {
zkClient.makePath("/solr" + ZkStateReader.CLUSTER_PROPS, "{'urlScheme':'https'}".getBytes(Charsets.UTF_8), true);
@ -222,12 +222,17 @@ public class MiniSolrCloudCluster {
throw startupError;
}
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(),
AbstractZkTestCase.TIMEOUT, 45000, null)) {
waitForAllNodes(numServers, 60);
solrClient = buildSolrClient();
}
private void waitForAllNodes(int numServers, int timeout) throws IOException, InterruptedException {
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT)) {
int numliveNodes = 0;
int retries = 60;
int retries = timeout;
String liveNodesPath = "/solr/live_nodes";
// Wait up to 60 seconds for number of live_nodes to match up number of servers
// Wait up to {timeout} seconds for number of live_nodes to match up number of servers
do {
if (zkClient.exists(liveNodesPath, true)) {
numliveNodes = zkClient.getChildren(liveNodesPath, null, true).size();
@ -244,8 +249,13 @@ public class MiniSolrCloudCluster {
Thread.sleep(1000);
} while (numliveNodes != numServers);
}
catch (KeeperException e) {
throw new IOException("Error communicating with zookeeper", e);
}
}
solrClient = buildSolrClient();
public void waitForAllNodes(int timeout) throws IOException, InterruptedException {
waitForAllNodes(jettys.size(), timeout);
}
private String newNodeName() {
@ -348,7 +358,13 @@ public class MiniSolrCloudCluster {
return jetty;
}
protected JettySolrRunner startJettySolrRunner(JettySolrRunner jetty) throws Exception {
/**
* Add a previously stopped node back to the cluster
* @param jetty a {@link JettySolrRunner} previously returned by {@link #stopJettySolrRunner(int)}
* @return the started node
* @throws Exception on error
*/
public JettySolrRunner startJettySolrRunner(JettySolrRunner jetty) throws Exception {
jetty.start();
jettys.add(jetty);
return jetty;