From af8a9624178e5e62b652ce3799c6e705056a3ae8 Mon Sep 17 00:00:00 2001 From: Timothy Potter Date: Thu, 14 Apr 2016 10:17:42 -0700 Subject: [PATCH] SOLR-8908: Fixed OnReconnect listener management in ZkController to allow for de-registering listeners. Here's what this commit includes: * Added the removeOnReconnectListener method to ZkController to allow OnReconnect listener implementations to de-register; avoids a memory leak * Updated ZkIndexSchemaReader to add a CloseHook to the SolrCore it supports to de-register as an OnReconnect listener * Added unit test to verify that after reloading and deleting a SolrCore in managed schema mode, the associated ZkIndexSchemaReader gets de-registered correctly --- solr/CHANGES.txt | 3 + .../org/apache/solr/cloud/ZkController.java | 66 +++++++- .../schema/ManagedIndexSchemaFactory.java | 4 +- .../solr/schema/ZkIndexSchemaReader.java | 56 ++++++- .../solr/cloud/CollectionReloadTest.java | 85 +--------- .../cloud/DistributedVersionInfoTest.java | 16 -- .../apache/solr/cloud/HttpPartitionTest.java | 16 -- .../cloud/TestOnReconnectListenerSupport.java | 155 ++++++++++++++++++ .../apache/solr/common/cloud/OnReconnect.java | 9 +- .../cloud/AbstractFullDistribZkTestBase.java | 75 +++++++++ 10 files changed, 360 insertions(+), 125 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 0f4bc1a5000..b36bf6135eb 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -590,6 +590,9 @@ Bug Fixes * SOLR-8870: AngularJS Query tab no longer URL-encodes the /select part of the request, fixing possible 404 issue when Solr is behind a proxy. Also, now supports old-style &qt param when handler not prefixed with "/" (janhoy) +* SOLR-8908: Fix to OnReconnect listener registration to allow listeners to deregister, such + as when a core is reloaded or deleted to avoid a memory leak. (Timothy Potter) + ======================= 5.5.0 ======================= Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index e288f5e926d..ab9422d8ead 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -183,7 +183,8 @@ public final class ZkController { private boolean zkRunOnly = Boolean.getBoolean("zkRunOnly"); // expert // keeps track of a list of objects that need to know a new ZooKeeper session was created after expiration occurred - private List reconnectListeners = new ArrayList(); + // ref is held as a HashSet since we clone the set before notifying to avoid synchronizing too long + private HashSet reconnectListeners = new HashSet(); private class RegisterCoreAsync implements Callable { @@ -204,6 +205,22 @@ public final class ZkController { } } + // notifies registered listeners after the ZK reconnect in the background + private class OnReconnectNotifyAsync implements Callable { + + private final OnReconnect listener; + + OnReconnectNotifyAsync(OnReconnect listener) { + this.listener = listener; + } + + @Override + public Object call() throws Exception { + listener.command(); + return null; + } + } + public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException, TimeoutException, IOException { @@ -291,8 +308,8 @@ public final class ZkController { List descriptors = registerOnReconnect.getCurrentDescriptors(); // re register all descriptors + ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null; if (descriptors != null) { - ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null; for (CoreDescriptor descriptor : descriptors) { // TODO: we need to think carefully about what happens when it // was @@ -315,17 +332,23 @@ public final class ZkController { } // notify any other objects that need to know when the session was re-connected + HashSet clonedListeners; synchronized (reconnectListeners) { - for (OnReconnect listener : reconnectListeners) { - try { + clonedListeners = (HashSet)reconnectListeners.clone(); + } + // the OnReconnect operation can be expensive per listener, so do that async in the background + for (OnReconnect listener : clonedListeners) { + try { + if (executorService != null) { + executorService.submit(new OnReconnectNotifyAsync(listener)); + } else { listener.command(); - } catch (Exception exc) { - // not much we can do here other than warn in the log - log.warn("Error when notifying OnReconnect listener " + listener + " after session re-connected.", exc); } + } catch (Exception exc) { + // not much we can do here other than warn in the log + log.warn("Error when notifying OnReconnect listener " + listener + " after session re-connected.", exc); } } - } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); @@ -2170,10 +2193,37 @@ public final class ZkController { if (listener != null) { synchronized (reconnectListeners) { reconnectListeners.add(listener); + log.info("Added new OnReconnect listener "+listener); } } } + /** + * Removed a previously registered OnReconnect listener, such as when a core is removed or reloaded. + */ + public void removeOnReconnectListener(OnReconnect listener) { + if (listener != null) { + boolean wasRemoved; + synchronized (reconnectListeners) { + wasRemoved = reconnectListeners.remove(listener); + } + if (wasRemoved) { + log.info("Removed OnReconnect listener "+listener); + } else { + log.warn("Was asked to remove OnReconnect listener "+listener+ + ", but remove operation did not find it in the list of registered listeners."); + } + } + } + + Set getCurrentOnReconnectListeners() { + HashSet clonedListeners; + synchronized (reconnectListeners) { + clonedListeners = (HashSet)reconnectListeners.clone(); + } + return clonedListeners; + } + /** * Persists a config file to ZooKeeper using optimistic concurrency. * diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java index b576ad07750..d5b0564ef1b 100644 --- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java +++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java @@ -30,6 +30,8 @@ import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkCmdExecutor; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CloseHook; +import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrConfig; import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrResourceLoader; @@ -373,7 +375,7 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol public void inform(SolrCore core) { this.core = core; if (loader instanceof ZkSolrResourceLoader) { - this.zkIndexSchemaReader = new ZkIndexSchemaReader(this); + this.zkIndexSchemaReader = new ZkIndexSchemaReader(this, core); ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)loader; zkLoader.setZkIndexSchemaReader(this.zkIndexSchemaReader); } else { diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java index db1bad8a7f1..25cf158789e 100644 --- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java +++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java @@ -20,6 +20,9 @@ import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.OnReconnect; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZooKeeperException; +import org.apache.solr.core.CloseHook; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -38,13 +41,34 @@ public class ZkIndexSchemaReader implements OnReconnect { private final ManagedIndexSchemaFactory managedIndexSchemaFactory; private SolrZkClient zkClient; private String managedSchemaPath; + private final String uniqueCoreId; // used in equals impl to uniquely identify the core that we're dependent on + private boolean isRemoved = false; - public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory) { + public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory, SolrCore solrCore) { this.managedIndexSchemaFactory = managedIndexSchemaFactory; ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)managedIndexSchemaFactory.getResourceLoader(); this.zkClient = zkLoader.getZkController().getZkClient(); - managedSchemaPath = zkLoader.getConfigSetZkPath() + "/" + managedIndexSchemaFactory.getManagedSchemaResourceName(); + this.managedSchemaPath = zkLoader.getConfigSetZkPath() + "/" + managedIndexSchemaFactory.getManagedSchemaResourceName(); + this.uniqueCoreId = solrCore.getName()+":"+solrCore.getStartNanoTime(); + + // register a CloseHook for the core this reader is linked to, so that we can de-register the listener + solrCore.addCloseHook(new CloseHook() { + @Override + public void preClose(SolrCore core) { + CoreContainer cc = core.getCoreDescriptor().getCoreContainer(); + if (cc.isZooKeeperAware()) { + log.info("Removing ZkIndexSchemaReader OnReconnect listener as core "+core.getName()+" is shutting down."); + ZkIndexSchemaReader.this.isRemoved = true; + cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this); + } + } + + @Override + public void postClose(SolrCore core) {} + }); + createSchemaWatcher(); + zkLoader.getZkController().addOnReconnectListener(this); } @@ -59,6 +83,11 @@ public class ZkIndexSchemaReader implements OnReconnect { zkClient.exists(managedSchemaPath, new Watcher() { @Override public void process(WatchedEvent event) { + + if (ZkIndexSchemaReader.this.isRemoved) { + return; // the core for this reader has already been removed, don't process this event + } + // session events are not change events, and do not remove the watcher if (Event.EventType.None.equals(event.getType())) { return; @@ -135,4 +164,27 @@ public class ZkIndexSchemaReader implements OnReconnect { log.error("Failed to update managed-schema watcher after session expiration due to: "+exc, exc); } } + + public String getUniqueCoreId() { + return uniqueCoreId; + } + + public String toString() { + return "ZkIndexSchemaReader: "+managedSchemaPath+", uniqueCoreId: "+uniqueCoreId; + } + + public int hashCode() { + return managedSchemaPath.hashCode()+uniqueCoreId.hashCode(); + } + + // We need the uniqueCoreId which is core name + start time nanos to be the tie breaker + // as there can be multiple ZkIndexSchemaReader instances active for the same core after + // a reload (one is initializing and the other is being shutdown) + public boolean equals(Object other) { + if (other == null) return false; + if (other == this) return true; + if (!(other instanceof ZkIndexSchemaReader)) return false; + ZkIndexSchemaReader that = (ZkIndexSchemaReader)other; + return this.managedSchemaPath.equals(that.managedSchemaPath) && this.uniqueCoreId.equals(that.uniqueCoreId); + } } diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java index cf8a65013c2..4f92e28dd0c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java @@ -16,26 +16,16 @@ */ package org.apache.solr.cloud; -import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.concurrent.TimeUnit; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.SolrTestCaseJ4.SuppressSSL; -import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.request.CoreAdminRequest; -import org.apache.solr.client.solrj.request.QueryRequest; -import org.apache.solr.client.solrj.response.CollectionAdminResponse; -import org.apache.solr.client.solrj.response.CoreAdminResponse; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; -import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.params.CollectionParams; -import org.apache.solr.common.params.ModifiableSolrParams; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,32 +55,13 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase { createCollectionRetry(testCollectionName, 1, 1, 1); cloudClient.setDefaultCollection(testCollectionName); - Replica leader = null; - String replicaState = null; - int timeoutSecs = 30; - long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS); - while (System.nanoTime() < timeout) { - Replica tmp = null; - try { - tmp = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId); - } catch (Exception exc) {} - if (tmp != null && "active".equals(tmp.getStr(ZkStateReader.STATE_PROP))) { - leader = tmp; - replicaState = "active"; - break; - } - Thread.sleep(1000); - } - assertNotNull("Could not find active leader for " + shardId + " of " + - testCollectionName + " after "+timeoutSecs+" secs; clusterState: " + - printClusterStateInfo(testCollectionName), leader); + Replica leader = getShardLeader(testCollectionName, shardId, 30 /* timeout secs */); // reload collection and wait to see the core report it has been reloaded boolean wasReloaded = reloadCollection(leader, testCollectionName); assertTrue("Collection '"+testCollectionName+"' failed to reload within a reasonable amount of time!", wasReloaded); - // cause session loss chaosMonkey.expireSession(getJettyOnPort(getReplicaPort(leader))); @@ -99,8 +70,9 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase { Thread.sleep(15000); // wait up to 15 seconds to see the replica in the active state - timeoutSecs = 15; - timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS); + String replicaState = null; + int timeoutSecs = 15; + long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS); while (System.nanoTime() < timeout) { // state of leader should be active after session loss recovery - see SOLR-7338 cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); @@ -126,53 +98,4 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase { log.info("testReloadedLeaderStateAfterZkSessionLoss succeeded ... shutting down now!"); } - - protected boolean reloadCollection(Replica replica, String testCollectionName) throws Exception { - ZkCoreNodeProps coreProps = new ZkCoreNodeProps(replica); - String coreName = coreProps.getCoreName(); - boolean reloadedOk = false; - try (HttpSolrClient client = getHttpSolrClient(coreProps.getBaseUrl())) { - CoreAdminResponse statusResp = CoreAdminRequest.getStatus(coreName, client); - long leaderCoreStartTime = statusResp.getStartTime(coreName).getTime(); - - Thread.sleep(1000); - - // send reload command for the collection - log.info("Sending RELOAD command for "+testCollectionName); - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set("action", CollectionParams.CollectionAction.RELOAD.toString()); - params.set("name", testCollectionName); - QueryRequest request = new QueryRequest(params); - request.setPath("/admin/collections"); - client.request(request); - Thread.sleep(2000); // reload can take a short while - - // verify reload is done, waiting up to 30 seconds for slow test environments - long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); - while (System.nanoTime() < timeout) { - statusResp = CoreAdminRequest.getStatus(coreName, client); - long startTimeAfterReload = statusResp.getStartTime(coreName).getTime(); - if (startTimeAfterReload > leaderCoreStartTime) { - reloadedOk = true; - break; - } - // else ... still waiting to see the reloaded core report a later start time - Thread.sleep(1000); - } - } - return reloadedOk; - } - - private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode) - throws SolrServerException, IOException { - CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode); - if (resp.getResponse().get("failure") != null) { - CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete(); - req.setCollectionName(testCollectionName); - req.process(cloudClient); - resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode); - if (resp.getResponse().get("failure") != null) - fail("Could not create " + testCollectionName); - } - } } diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java index 812815b0173..f4295442486 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java @@ -291,22 +291,6 @@ public class DistributedVersionInfoTest extends AbstractFullDistribZkTestBase { return vers.longValue(); } - private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode) - throws SolrServerException, IOException { - CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode); - if (resp.getResponse().get("failure") != null) { - CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete(); - req.setCollectionName(testCollectionName); - req.process(cloudClient); - - resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode); - - if (resp.getResponse().get("failure") != null) { - fail("Could not create " + testCollectionName); - } - } - } - protected void assertDocsExistInAllReplicas(List notLeaders, String testCollectionName, int firstDocId, diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java index 7e2c17d91a3..1f7756cb1e5 100644 --- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java @@ -428,22 +428,6 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { } } - private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode) - throws SolrServerException, IOException { - CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode); - if (resp.getResponse().get("failure") != null) { - CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete(); - req.setCollectionName(testCollectionName); - req.process(cloudClient); - - resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode); - - if (resp.getResponse().get("failure") != null) { - fail("Could not create " + testCollectionName); - } - } - } - // test inspired by SOLR-6511 protected void testLeaderZkSessionLoss() throws Exception { diff --git a/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java b/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java new file mode 100644 index 00000000000..052cd1f630b --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java @@ -0,0 +1,155 @@ +package org.apache.solr.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.lang.invoke.MethodHandles; +import java.util.Set; + +import org.apache.solr.SolrTestCaseJ4.SuppressSSL; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.common.cloud.OnReconnect; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.schema.ZkIndexSchemaReader; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; + +@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") +public class TestOnReconnectListenerSupport extends AbstractFullDistribZkTestBase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public TestOnReconnectListenerSupport() { + super(); + sliceCount = 2; + fixShardCount(3); + } + + @BeforeClass + public static void initSysProperties() { + System.setProperty("managed.schema.mutable", "false"); + System.setProperty("enable.update.log", "true"); + } + + @Override + protected String getCloudSolrConfig() { + return "solrconfig-managed-schema.xml"; + } + + @Test + public void test() throws Exception { + waitForThingsToLevelOut(30000); + + String testCollectionName = "c8n_onreconnect_1x1"; + String shardId = "shard1"; + createCollectionRetry(testCollectionName, 1, 1, 1); + cloudClient.setDefaultCollection(testCollectionName); + + Replica leader = getShardLeader(testCollectionName, shardId, 30 /* timeout secs */); + JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader)); + + // get the ZkController for the node hosting the leader + CoreContainer cores = leaderJetty.getCoreContainer(); + ZkController zkController = cores.getZkController(); + assertNotNull("ZkController is null", zkController); + + String leaderCoreName = leader.getStr(CORE_NAME_PROP); + String leaderCoreId; + try (SolrCore leaderCore = cores.getCore(leaderCoreName)) { + assertNotNull("SolrCore for "+leaderCoreName+" not found!", leaderCore); + leaderCoreId = leaderCore.getName()+":"+leaderCore.getStartNanoTime(); + } + + // verify the ZkIndexSchemaReader is a registered OnReconnect listener + Set listeners = zkController.getCurrentOnReconnectListeners(); + assertNotNull("ZkController returned null OnReconnect listeners", listeners); + ZkIndexSchemaReader expectedListener = null; + for (OnReconnect listener : listeners) { + if (listener instanceof ZkIndexSchemaReader) { + ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener; + if (leaderCoreId.equals(reader.getUniqueCoreId())) { + expectedListener = reader; + break; + } + } + } + assertNotNull("ZkIndexSchemaReader for core " + leaderCoreName + + " not registered as an OnReconnect listener and should be", expectedListener); + + // reload the collection + boolean wasReloaded = reloadCollection(leader, testCollectionName); + assertTrue("Collection '" + testCollectionName + "' failed to reload within a reasonable amount of time!", + wasReloaded); + + // after reload, the new core should be registered as an OnReconnect listener and the old should not be + String reloadedLeaderCoreId; + try (SolrCore leaderCore = cores.getCore(leaderCoreName)) { + reloadedLeaderCoreId = leaderCore.getName()+":"+leaderCore.getStartNanoTime(); + } + + // they shouldn't be equal after reload + assertTrue(!leaderCoreId.equals(reloadedLeaderCoreId)); + + listeners = zkController.getCurrentOnReconnectListeners(); + assertNotNull("ZkController returned null OnReconnect listeners", listeners); + + expectedListener = null; // reset + for (OnReconnect listener : listeners) { + if (listener instanceof ZkIndexSchemaReader) { + ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener; + if (leaderCoreId.equals(reader.getUniqueCoreId())) { + fail("Previous core "+leaderCoreId+ + " should no longer be a registered OnReconnect listener! Current listeners: "+listeners); + } else if (reloadedLeaderCoreId.equals(reader.getUniqueCoreId())) { + expectedListener = reader; + break; + } + } + } + + assertNotNull("ZkIndexSchemaReader for core "+reloadedLeaderCoreId+ + " not registered as an OnReconnect listener and should be", expectedListener); + + // try to clean up + try { + CollectionAdminRequest.deleteCollection(testCollectionName).process(cloudClient); + } catch (Exception e) { + // don't fail the test + log.warn("Could not delete collection {} after test completed", testCollectionName); + } + + listeners = zkController.getCurrentOnReconnectListeners(); + for (OnReconnect listener : listeners) { + if (listener instanceof ZkIndexSchemaReader) { + ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener; + if (reloadedLeaderCoreId.equals(reader.getUniqueCoreId())) { + fail("Previous core "+reloadedLeaderCoreId+ + " should no longer be a registered OnReconnect listener after collection delete!"); + } + } + } + + log.info("TestOnReconnectListenerSupport succeeded ... shutting down now!"); + } +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java b/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java index 4f6b2c685dd..46aed08572d 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java @@ -16,6 +16,13 @@ */ package org.apache.solr.common.cloud; +/** + * Implementations are expected to implement a correct hashCode and equals + * method needed to uniquely identify the listener as listeners are managed + * in a Set. In addition, your listener implementation should call + * org.apache.solr.cloud.ZkController#removeOnReconnectListener(OnReconnect) + * when it no longer needs to be notified of ZK reconnection events. + */ public interface OnReconnect { - public void command(); + void command(); } diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java index 6028563bc16..6808f819928 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java @@ -46,9 +46,11 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.client.solrj.response.CoreAdminResponse; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.common.SolrDocument; @@ -1808,6 +1810,43 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes createCollection(collectionInfos, collName, props, client); } + protected void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode) + throws SolrServerException, IOException { + CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode); + if (resp.getResponse().get("failure") != null) { + CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete(); + req.setCollectionName(testCollectionName); + req.process(cloudClient); + + resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode); + + if (resp.getResponse().get("failure") != null) { + fail("Could not create " + testCollectionName); + } + } + } + + protected Replica getShardLeader(String testCollectionName, String shardId, int timeoutSecs) throws Exception { + Replica leader = null; + long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS); + while (System.nanoTime() < timeout) { + Replica tmp = null; + try { + tmp = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId); + } catch (Exception exc) {} + if (tmp != null && "active".equals(tmp.getStr(ZkStateReader.STATE_PROP))) { + leader = tmp; + break; + } + Thread.sleep(1000); + } + assertNotNull("Could not find active leader for " + shardId + " of " + + testCollectionName + " after "+timeoutSecs+" secs; clusterState: " + + printClusterStateInfo(testCollectionName), leader); + + return leader; + } + protected List ensureAllReplicasAreActive(String testCollectionName, String shardId, int shards, int rf, int maxWaitSecs) throws Exception { final RTimer timer = new RTimer(); @@ -1891,6 +1930,42 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes return cs; } + protected boolean reloadCollection(Replica replica, String testCollectionName) throws Exception { + ZkCoreNodeProps coreProps = new ZkCoreNodeProps(replica); + String coreName = coreProps.getCoreName(); + boolean reloadedOk = false; + try (HttpSolrClient client = getHttpSolrClient(coreProps.getBaseUrl())) { + CoreAdminResponse statusResp = CoreAdminRequest.getStatus(coreName, client); + long leaderCoreStartTime = statusResp.getStartTime(coreName).getTime(); + + Thread.sleep(1000); + + // send reload command for the collection + log.info("Sending RELOAD command for "+testCollectionName); + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set("action", CollectionParams.CollectionAction.RELOAD.toString()); + params.set("name", testCollectionName); + QueryRequest request = new QueryRequest(params); + request.setPath("/admin/collections"); + client.request(request); + Thread.sleep(2000); // reload can take a short while + + // verify reload is done, waiting up to 30 seconds for slow test environments + long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); + while (System.nanoTime() < timeout) { + statusResp = CoreAdminRequest.getStatus(coreName, client); + long startTimeAfterReload = statusResp.getStartTime(coreName).getTime(); + if (startTimeAfterReload > leaderCoreStartTime) { + reloadedOk = true; + break; + } + // else ... still waiting to see the reloaded core report a later start time + Thread.sleep(1000); + } + } + return reloadedOk; + } + static RequestStatusState getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client) throws IOException, SolrServerException { RequestStatusState state = null;