SOLR-9113: Improve state watcher API

This commit is contained in:
Alan Woodward 2016-05-17 10:33:37 +01:00
parent 6942fe2d20
commit 2b9cbe97a8
5 changed files with 61 additions and 37 deletions

View File

@ -140,7 +140,7 @@ New Features
* SOLR-8208: [subquery] document transformer executes separate requests per result document. (Cao Manh Dat via Mikhail Khludnev)
* SOLR-8323: All CollectionStateWatcher API (Alan Woodward, Scott Blum)
* SOLR-8323, SOLR-9113: Add CollectionStateWatcher API (Alan Woodward, Scott Blum)
Bug Fixes
----------------------

View File

@ -682,7 +682,6 @@ public final class ZkController {
InterruptedException {
publishNodeAsDown(getNodeName());
// now wait till the updates are in our state
long now = System.nanoTime();

View File

@ -31,12 +31,12 @@ public interface CollectionStateWatcher {
* Note that, due to the way Zookeeper watchers are implemented, a single call may be
* the result of several state changes
*
* A watcher is unregistered after it has been called once. To make a watcher persistent,
* implementors should re-register during this call.
*
* @param liveNodes the set of live nodes
* @param collectionState the new collection state
* @param collectionState the new collection state (may be null if the collection has been
* deleted)
*
* @return true if the watcher should be removed
*/
void onStateChanged(Set<String> liveNodes, DocCollection collectionState);
boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState);
}

View File

@ -1145,10 +1145,6 @@ public class ZkStateReader implements Closeable {
/**
* Register a CollectionStateWatcher to be called when the state of a collection changes
*
* A given CollectionStateWatcher will be only called once. If you want to have a persistent watcher,
* it should register itself again in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
* method.
*/
public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false);
@ -1166,6 +1162,12 @@ public class ZkStateReader implements Closeable {
constructState();
}
}
else {
DocCollection state = clusterState.getCollectionOrNull(collection);
if (stateWatcher.onStateChanged(liveNodes, state) == true) {
removeCollectionStateWatcher(collection, stateWatcher);
}
}
}
/**
@ -1186,24 +1188,15 @@ public class ZkStateReader implements Closeable {
final CountDownLatch latch = new CountDownLatch(1);
CollectionStateWatcher watcher = new CollectionStateWatcher() {
@Override
public void onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
if (predicate.matches(liveNodes, collectionState)) {
latch.countDown();
} else {
registerCollectionStateWatcher(collection, this);
}
}
CollectionStateWatcher watcher = (n, c) -> {
boolean matches = predicate.matches(n, c);
if (matches)
latch.countDown();
return matches;
};
registerCollectionStateWatcher(collection, watcher);
try {
// check the current state
DocCollection dc = clusterState.getCollectionOrNull(collection);
if (predicate.matches(liveNodes, dc))
return;
// wait for the watcher predicate to return true, or time out
if (!latch.await(wait, unit))
throw new TimeoutException();
@ -1268,7 +1261,9 @@ public class ZkStateReader implements Closeable {
}
} else {
if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
// Nothing to do, someone else updated same or newer.
// no change to state, but we might have been triggered by the addition of a
// state watcher, so run notifications
notifyStateWatchers(liveNodes, coll, newState);
break;
}
if (watchedCollectionStates.replace(coll, oldState, newState)) {
@ -1335,7 +1330,9 @@ public class ZkStateReader implements Closeable {
return v;
});
for (CollectionStateWatcher watcher : watchers) {
watcher.onStateChanged(liveNodes, collectionState);
if (watcher.onStateChanged(liveNodes, collectionState) == false) {
registerCollectionStateWatcher(collection, watcher);
}
}
}

View File

@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@ -39,8 +38,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.core.Is.is;
public class TestCollectionStateWatchers extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -96,29 +93,60 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
// shutdown a node and check that we get notified about the change
final AtomicInteger nodeCount = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(1);
client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> {
// we can't just count liveNodes here, because that's updated by a separate watcher,
// and it may be the case that we're triggered by a node setting itself to DOWN before
// the liveNodes watcher is called
int nodeCount = 0;
log.info("State changed: {}", collectionState);
for (Slice slice : collectionState) {
for (Replica replica : slice) {
if (replica.isActive(liveNodes))
nodeCount.incrementAndGet();
nodeCount++;
}
}
latch.countDown();
if (nodeCount == 3) {
latch.countDown();
return true;
}
return false;
});
cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertThat(nodeCount.intValue(), is(3));
assertEquals("CollectionStateWatcher wasn't cleared after completion",
0, client.getZkStateReader().getStateWatchers("testcollection").size());
}
@Test
public void testStateWatcherChecksCurrentStateOnRegister() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("currentstate", "config", 1, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
final CountDownLatch latch = new CountDownLatch(1);
client.registerCollectionStateWatcher("currentstate", (n, c) -> {
latch.countDown();
return false;
});
assertTrue("CollectionStateWatcher isn't called on new registration", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertEquals("CollectionStateWatcher should be retained",
1, client.getZkStateReader().getStateWatchers("currentstate").size());
final CountDownLatch latch2 = new CountDownLatch(1);
client.registerCollectionStateWatcher("currentstate", (n, c) -> {
latch2.countDown();
return true;
});
assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection",
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertEquals("CollectionStateWatcher should be removed",
1, client.getZkStateReader().getStateWatchers("currentstate").size());
}
@Test
public void testWaitForStateChecksCurrentState() throws Exception {