mirror of
https://github.com/apache/lucene.git
synced 2025-02-10 03:55:46 +00:00
SOLR-11924: Added CloudCollectionsListener to watch the list of collections in a cloud. This closes #313
This commit is contained in:
parent
d904112428
commit
ae0190b696
@ -0,0 +1,40 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.common.cloud;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback registered with {@link ZkStateReader#registerCloudCollectionsListener(CloudCollectionsListener)}
|
||||||
|
* and called whenever the cloud's set of collections changes.
|
||||||
|
*/
|
||||||
|
public interface CloudCollectionsListener {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when a collection is created, a collection is deleted or a watched collection's state changes.
|
||||||
|
*
|
||||||
|
* Note that, due to the way Zookeeper watchers are implemented, a single call may be
|
||||||
|
* the result of multiple or no collection creation or deletions. Also, multiple calls to this method can be made
|
||||||
|
* with the same set of collections, ie. without any new updates.
|
||||||
|
*
|
||||||
|
* @param oldCollections the previous set of collections
|
||||||
|
* @param newCollections the new set of collections
|
||||||
|
*/
|
||||||
|
void onChange(Set<String> oldCollections, Set<String> newCollections);
|
||||||
|
|
||||||
|
}
|
@ -171,6 +171,8 @@ public class ZkStateReader implements Closeable {
|
|||||||
|
|
||||||
private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsWatches = new ConcurrentHashMap<>();
|
private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsWatches = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
|
private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
|
||||||
|
|
||||||
/** Used to submit notifications to Collection Properties watchers in order **/
|
/** Used to submit notifications to Collection Properties watchers in order **/
|
||||||
@ -545,6 +547,8 @@ public class ZkStateReader implements Closeable {
|
|||||||
clusterState.getCollectionStates());
|
clusterState.getCollectionStates());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
notifyCloudCollectionsListeners();
|
||||||
|
|
||||||
for (String collection : changedCollections) {
|
for (String collection : changedCollections) {
|
||||||
notifyStateWatchers(liveNodes, collection, clusterState.getCollectionOrNull(collection));
|
notifyStateWatchers(liveNodes, collection, clusterState.getCollectionOrNull(collection));
|
||||||
}
|
}
|
||||||
@ -650,6 +654,52 @@ public class ZkStateReader implements Closeable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We don't get a Stat or track versions on getChildren() calls, so force linearization.
|
||||||
|
private final Object refreshCollectionsSetLock = new Object();
|
||||||
|
// Ensures that only the latest getChildren fetch gets applied.
|
||||||
|
private final AtomicReference<Set<String>> lastFetchedCollectionSet = new AtomicReference<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a CloudCollectionsListener to be called when the set of collections within a cloud changes.
|
||||||
|
*/
|
||||||
|
public void registerCloudCollectionsListener(CloudCollectionsListener cloudCollectionsListener) {
|
||||||
|
cloudCollectionsListeners.add(cloudCollectionsListener);
|
||||||
|
notifyNewCloudCollectionsListener(cloudCollectionsListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a registered CloudCollectionsListener.
|
||||||
|
*/
|
||||||
|
public void removeCloudCollectionsListener(CloudCollectionsListener cloudCollectionsListener) {
|
||||||
|
cloudCollectionsListeners.remove(cloudCollectionsListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void notifyNewCloudCollectionsListener(CloudCollectionsListener listener) {
|
||||||
|
listener.onChange(Collections.emptySet(), lastFetchedCollectionSet.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void notifyCloudCollectionsListeners() {
|
||||||
|
notifyCloudCollectionsListeners(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void notifyCloudCollectionsListeners(boolean notifyIfSame) {
|
||||||
|
synchronized (refreshCollectionsSetLock) {
|
||||||
|
final Set<String> newCollections = getCurrentCollections();
|
||||||
|
final Set<String> oldCollections = lastFetchedCollectionSet.getAndSet(newCollections);
|
||||||
|
if (!newCollections.equals(oldCollections) || notifyIfSame) {
|
||||||
|
cloudCollectionsListeners.forEach(listener -> listener.onChange(oldCollections, newCollections));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> getCurrentCollections() {
|
||||||
|
Set<String> collections = new HashSet<>();
|
||||||
|
collections.addAll(legacyCollectionStates.keySet());
|
||||||
|
collections.addAll(watchedCollectionStates.keySet());
|
||||||
|
collections.addAll(lazyCollectionStates.keySet());
|
||||||
|
return collections;
|
||||||
|
}
|
||||||
|
|
||||||
private class LazyCollectionRef extends ClusterState.CollectionRef {
|
private class LazyCollectionRef extends ClusterState.CollectionRef {
|
||||||
private final String collName;
|
private final String collName;
|
||||||
private long lastUpdateTime;
|
private long lastUpdateTime;
|
||||||
|
@ -0,0 +1,311 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.common.cloud;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.util.ExecutorUtil;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class TestCloudCollectionsListeners extends SolrCloudTestCase {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static final int CLUSTER_SIZE = 4;
|
||||||
|
|
||||||
|
private static final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers");
|
||||||
|
|
||||||
|
private static final int MAX_WAIT_TIMEOUT = 30;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void startCluster() throws Exception {
|
||||||
|
configureCluster(CLUSTER_SIZE)
|
||||||
|
.addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
|
||||||
|
.configure();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void shutdownBackgroundExecutors() {
|
||||||
|
executor.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void prepareCluster() throws Exception {
|
||||||
|
int missingServers = CLUSTER_SIZE - cluster.getJettySolrRunners().size();
|
||||||
|
for (int i = 0; i < missingServers; i++) {
|
||||||
|
cluster.startJettySolrRunner();
|
||||||
|
}
|
||||||
|
cluster.waitForAllNodes(30);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleCloudCollectionsListener() throws Exception {
|
||||||
|
|
||||||
|
CloudSolrClient client = cluster.getSolrClient();
|
||||||
|
|
||||||
|
Map<Integer, Set<String>> oldResults = new HashMap<>();
|
||||||
|
Map<Integer, Set<String>> newResults = new HashMap<>();
|
||||||
|
|
||||||
|
CloudCollectionsListener watcher1 = (oldCollections, newCollections) -> {
|
||||||
|
log.info("New set of collections: {}, {}", oldCollections, newCollections);
|
||||||
|
oldResults.put(1, oldCollections);
|
||||||
|
newResults.put(1, newCollections);
|
||||||
|
};
|
||||||
|
client.getZkStateReader().registerCloudCollectionsListener(watcher1);
|
||||||
|
CloudCollectionsListener watcher2 = (oldCollections, newCollections) -> {
|
||||||
|
log.info("New set of collections: {}, {}", oldCollections, newCollections);
|
||||||
|
oldResults.put(2, oldCollections);
|
||||||
|
newResults.put(2, newCollections);
|
||||||
|
};
|
||||||
|
client.getZkStateReader().registerCloudCollectionsListener(watcher2);
|
||||||
|
|
||||||
|
assertFalse("CloudCollectionsListener not triggered after registration", oldResults.get(1).contains("testcollection1"));
|
||||||
|
assertFalse("CloudCollectionsListener not triggered after registration", oldResults.get(2).contains("testcollection1"));
|
||||||
|
|
||||||
|
assertFalse("CloudCollectionsListener not triggered after registration", newResults.get(1).contains("testcollection1"));
|
||||||
|
assertFalse("CloudCollectionsListener not triggered after registration", newResults.get(2).contains("testcollection1"));
|
||||||
|
|
||||||
|
CollectionAdminRequest.createCollection("testcollection1", "config", 4, 1)
|
||||||
|
.processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
client.waitForState("testcollection1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
|
||||||
|
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
|
||||||
|
|
||||||
|
assertFalse("CloudCollectionsListener has new collection in old set of collections", oldResults.get(1).contains("testcollection1"));
|
||||||
|
assertFalse("CloudCollectionsListener has new collection in old set of collections", oldResults.get(2).contains("testcollection1"));
|
||||||
|
|
||||||
|
assertTrue("CloudCollectionsListener doesn't have new collection in new set of collections", newResults.get(1).contains("testcollection1"));
|
||||||
|
assertTrue("CloudCollectionsListener doesn't have new collection in new set of collections", newResults.get(2).contains("testcollection1"));
|
||||||
|
|
||||||
|
client.getZkStateReader().removeCloudCollectionsListener(watcher1);
|
||||||
|
|
||||||
|
CollectionAdminRequest.createCollection("testcollection2", "config", 4, 1)
|
||||||
|
.processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
client.waitForState("testcollection2", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
|
||||||
|
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
|
||||||
|
|
||||||
|
|
||||||
|
assertFalse("CloudCollectionsListener notified after removal", oldResults.get(1).contains("testcollection1"));
|
||||||
|
assertTrue("CloudCollectionsListener does not contain old collection in list of old collections", oldResults.get(2).contains("testcollection1"));
|
||||||
|
assertFalse("CloudCollectionsListener contains new collection in old collection set", oldResults.get(1).contains("testcollection2"));
|
||||||
|
assertFalse("CloudCollectionsListener contains new collection in old collection set", oldResults.get(2).contains("testcollection2"));
|
||||||
|
|
||||||
|
assertFalse("CloudCollectionsListener notified after removal", newResults.get(1).contains("testcollection2"));
|
||||||
|
assertTrue("CloudCollectionsListener does not contain new collection in list of new collections", newResults.get(2).contains("testcollection2"));
|
||||||
|
|
||||||
|
CollectionAdminRequest.deleteCollection("testcollection1").processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
|
||||||
|
CollectionAdminRequest.deleteCollection("testcollection2").processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
|
||||||
|
client.getZkStateReader().removeCloudCollectionsListener(watcher2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCollectionDeletion() throws Exception {
|
||||||
|
|
||||||
|
CloudSolrClient client = cluster.getSolrClient();
|
||||||
|
|
||||||
|
CollectionAdminRequest.createCollection("testcollection1", "config", 4, 1)
|
||||||
|
.processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
client.waitForState("testcollection1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
|
||||||
|
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
|
||||||
|
|
||||||
|
CollectionAdminRequest.createCollection("testcollection2", "config", 4, 1)
|
||||||
|
.processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
client.waitForState("testcollection2", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
|
||||||
|
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
|
||||||
|
|
||||||
|
Map<Integer, Set<String>> oldResults = new HashMap<>();
|
||||||
|
Map<Integer, Set<String>> newResults = new HashMap<>();
|
||||||
|
|
||||||
|
CloudCollectionsListener watcher1 = (oldCollections, newCollections) -> {
|
||||||
|
log.info("New set of collections: {}, {}", oldCollections, newCollections);
|
||||||
|
oldResults.put(1, oldCollections);
|
||||||
|
newResults.put(1, newCollections);
|
||||||
|
};
|
||||||
|
client.getZkStateReader().registerCloudCollectionsListener(watcher1);
|
||||||
|
CloudCollectionsListener watcher2 = (oldCollections, newCollections) -> {
|
||||||
|
log.info("New set of collections: {}, {}", oldCollections, newCollections);
|
||||||
|
oldResults.put(2, oldCollections);
|
||||||
|
newResults.put(2, newCollections);
|
||||||
|
};
|
||||||
|
client.getZkStateReader().registerCloudCollectionsListener(watcher2);
|
||||||
|
|
||||||
|
|
||||||
|
assertEquals("CloudCollectionsListener has old collection with size > 0 after registration", 0, oldResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener has old collection with size > 0 after registration", 0, oldResults.get(2).size());
|
||||||
|
|
||||||
|
assertTrue("CloudCollectionsListener not notified of all collections after registration", newResults.get(1).contains("testcollection1"));
|
||||||
|
assertTrue("CloudCollectionsListener not notified of all collections after registration", newResults.get(1).contains("testcollection2"));
|
||||||
|
assertTrue("CloudCollectionsListener not notified of all collections after registration", newResults.get(2).contains("testcollection1"));
|
||||||
|
assertTrue("CloudCollectionsListener not notified of all collections after registration", newResults.get(2).contains("testcollection2"));
|
||||||
|
|
||||||
|
CollectionAdminRequest.deleteCollection("testcollection1").processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
|
||||||
|
assertEquals("CloudCollectionsListener missing old collection after collection removal", 2, oldResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener missing old collection after collection removal", 2, oldResults.get(2).size());
|
||||||
|
|
||||||
|
assertFalse("CloudCollectionsListener notifies with collection that no longer exists", newResults.get(1).contains("testcollection1"));
|
||||||
|
assertTrue("CloudCollectionsListener doesn't notify of collection that exists", newResults.get(1).contains("testcollection2"));
|
||||||
|
assertFalse("CloudCollectionsListener notifies with collection that no longer exists", newResults.get(2).contains("testcollection1"));
|
||||||
|
assertTrue("CloudCollectionsListener doesn't notify of collection that exists", newResults.get(2).contains("testcollection2"));
|
||||||
|
|
||||||
|
client.getZkStateReader().removeCloudCollectionsListener(watcher2);
|
||||||
|
|
||||||
|
CollectionAdminRequest.deleteCollection("testcollection2").processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
|
||||||
|
assertEquals("CloudCollectionsListener has incorrect number of old collections", 1, oldResults.get(1).size());
|
||||||
|
assertTrue("CloudCollectionsListener has incorrect old collection after collection removal", oldResults.get(1).contains("testcollection2"));
|
||||||
|
assertEquals("CloudCollectionsListener called after removal", 2, oldResults.get(2).size());
|
||||||
|
|
||||||
|
assertFalse("CloudCollectionsListener shows live collection after removal", newResults.get(1).contains("testcollection1"));
|
||||||
|
assertFalse("CloudCollectionsListener shows live collection after removal", newResults.get(1).contains("testcollection2"));
|
||||||
|
assertFalse("CloudCollectionsListener called after removal", newResults.get(2).contains("testcollection1"));
|
||||||
|
assertTrue("CloudCollectionsListener called after removal", newResults.get(2).contains("testcollection2"));
|
||||||
|
|
||||||
|
client.getZkStateReader().removeCloudCollectionsListener(watcher1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWatchesWorkForBothStateFormats() throws Exception {
|
||||||
|
CloudSolrClient client = cluster.getSolrClient();
|
||||||
|
|
||||||
|
Map<Integer, Set<String>> oldResults = new HashMap<>();
|
||||||
|
Map<Integer, Set<String>> newResults = new HashMap<>();
|
||||||
|
|
||||||
|
CloudCollectionsListener watcher1 = (oldCollections, newCollections) -> {
|
||||||
|
log.info("New set of collections: {}, {}", oldCollections, newCollections);
|
||||||
|
oldResults.put(1, oldCollections);
|
||||||
|
newResults.put(1, newCollections);
|
||||||
|
};
|
||||||
|
client.getZkStateReader().registerCloudCollectionsListener(watcher1);
|
||||||
|
CloudCollectionsListener watcher2 = (oldCollections, newCollections) -> {
|
||||||
|
log.info("New set of collections: {}, {}", oldCollections, newCollections);
|
||||||
|
oldResults.put(2, oldCollections);
|
||||||
|
newResults.put(2, newCollections);
|
||||||
|
};
|
||||||
|
client.getZkStateReader().registerCloudCollectionsListener(watcher2);
|
||||||
|
|
||||||
|
assertEquals("CloudCollectionsListener has old collections with size > 0 after registration", 0, oldResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener has old collections with size > 0 after registration", 0, oldResults.get(2).size());
|
||||||
|
assertEquals("CloudCollectionsListener has new collections with size > 0 after registration", 0, newResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener has new collections with size > 0 after registration", 0, newResults.get(2).size());
|
||||||
|
|
||||||
|
// Creating old state format collection
|
||||||
|
|
||||||
|
CollectionAdminRequest.createCollection("testcollection1", "config", 4, 1)
|
||||||
|
.setStateFormat(1)
|
||||||
|
.processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
client.waitForState("testcollection1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
|
||||||
|
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
|
||||||
|
|
||||||
|
assertEquals("CloudCollectionsListener has old collections with size > 0 after collection created with old stateFormat", 0, oldResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener has old collections with size > 0 after collection created with old stateFormat", 0, oldResults.get(2).size());
|
||||||
|
assertEquals("CloudCollectionsListener not updated with created collection with old stateFormat", 1, newResults.get(1).size());
|
||||||
|
assertTrue("CloudCollectionsListener not updated with created collection with old stateFormat", newResults.get(1).contains("testcollection1"));
|
||||||
|
assertEquals("CloudCollectionsListener not updated with created collection with old stateFormat", 1, newResults.get(2).size());
|
||||||
|
assertTrue("CloudCollectionsListener not updated with created collection with old stateFormat", newResults.get(2).contains("testcollection1"));
|
||||||
|
|
||||||
|
// Creating new state format collection
|
||||||
|
|
||||||
|
CollectionAdminRequest.createCollection("testcollection2", "config", 4, 1)
|
||||||
|
.processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
client.waitForState("testcollection2", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
|
||||||
|
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
|
||||||
|
|
||||||
|
assertEquals("CloudCollectionsListener has incorrect old collections after collection created with new stateFormat", 1, oldResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener has incorrect old collections after collection created with new stateFormat", 1, oldResults.get(2).size());
|
||||||
|
assertEquals("CloudCollectionsListener not updated with created collection with new stateFormat", 2, newResults.get(1).size());
|
||||||
|
assertTrue("CloudCollectionsListener not updated with created collection with new stateFormat", newResults.get(1).contains("testcollection2"));
|
||||||
|
assertEquals("CloudCollectionsListener not updated with created collection with new stateFormat", 2, newResults.get(2).size());
|
||||||
|
assertTrue("CloudCollectionsListener not updated with created collection with new stateFormat", newResults.get(2).contains("testcollection2"));
|
||||||
|
|
||||||
|
client.getZkStateReader().removeCloudCollectionsListener(watcher2);
|
||||||
|
|
||||||
|
// Creating old state format collection
|
||||||
|
|
||||||
|
CollectionAdminRequest.createCollection("testcollection3", "config", 4, 1)
|
||||||
|
.setStateFormat(1)
|
||||||
|
.processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
client.waitForState("testcollection1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
|
||||||
|
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
|
||||||
|
|
||||||
|
assertEquals("CloudCollectionsListener has incorrect old collections after collection created with old stateFormat", 2, oldResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener updated after removal", 1, oldResults.get(2).size());
|
||||||
|
assertEquals("CloudCollectionsListener not updated with created collection with old stateFormat", 3, newResults.get(1).size());
|
||||||
|
assertTrue("CloudCollectionsListener not updated with created collection with old stateFormat", newResults.get(1).contains("testcollection3"));
|
||||||
|
assertEquals("CloudCollectionsListener updated after removal", 2, newResults.get(2).size());
|
||||||
|
assertFalse("CloudCollectionsListener updated after removal", newResults.get(2).contains("testcollection3"));
|
||||||
|
|
||||||
|
// Adding back listener
|
||||||
|
client.getZkStateReader().registerCloudCollectionsListener(watcher2);
|
||||||
|
|
||||||
|
assertEquals("CloudCollectionsListener has old collections after registration", 0, oldResults.get(2).size());
|
||||||
|
assertEquals("CloudCollectionsListener doesn't have all collections after registration", 3, newResults.get(2).size());
|
||||||
|
|
||||||
|
// Deleting old state format collection
|
||||||
|
|
||||||
|
CollectionAdminRequest.deleteCollection("testcollection1").processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
|
||||||
|
assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 3, oldResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 3, oldResults.get(2).size());
|
||||||
|
assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 2, newResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 2, newResults.get(2).size());
|
||||||
|
assertFalse("CloudCollectionsListener not updated with deleted collection with old stateFormat", newResults.get(1).contains("testcollection1"));
|
||||||
|
assertFalse("CloudCollectionsListener not updated with deleted collection with old stateFormat", newResults.get(2).contains("testcollection1"));
|
||||||
|
|
||||||
|
CollectionAdminRequest.deleteCollection("testcollection2").processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
|
||||||
|
assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 2, oldResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 2, oldResults.get(2).size());
|
||||||
|
assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 1, newResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 1, newResults.get(2).size());
|
||||||
|
assertFalse("CloudCollectionsListener not updated with deleted collection with new stateFormat", newResults.get(1).contains("testcollection2"));
|
||||||
|
assertFalse("CloudCollectionsListener not updated with deleted collection with new stateFormat", newResults.get(2).contains("testcollection2"));
|
||||||
|
|
||||||
|
client.getZkStateReader().removeCloudCollectionsListener(watcher1);
|
||||||
|
|
||||||
|
CollectionAdminRequest.deleteCollection("testcollection3").processAndWait(client, MAX_WAIT_TIMEOUT);
|
||||||
|
|
||||||
|
assertEquals("CloudCollectionsListener updated after removal", 2, oldResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 1, oldResults.get(2).size());
|
||||||
|
assertEquals("CloudCollectionsListener updated after removal", 1, newResults.get(1).size());
|
||||||
|
assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 0, newResults.get(2).size());
|
||||||
|
assertTrue("CloudCollectionsListener updated after removal", newResults.get(1).contains("testcollection3"));
|
||||||
|
assertFalse("CloudCollectionsListener not updated with deleted collection with old stateFormat", newResults.get(2).contains("testcollection3"));
|
||||||
|
|
||||||
|
client.getZkStateReader().removeCloudCollectionsListener(watcher2);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user