diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 944bc29f3ce..48bd9a819c0 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -160,10 +160,12 @@ Optimizations
Bug Fixes
---------------------
-(No changes)
+
+* SOLR-14940: ReplicationHandler memory leak through SolrCore.closeHooks with unstable ZK connection. (Anver Sotnikov, Mike Drob)
Other Changes
---------------------
+
* SOLR-14954: Heavily edit reindexing.adoc (Sameul García Martínez and Erick Erickson)
================== 8.7.0 ==================
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index 7e2b8720c95..1fa86c0e5fa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -52,10 +52,18 @@ public class ReplicateFromLeader {
/**
* Start a replication handler thread that will periodically pull indices from the shard leader
+ *
+ *
This is separate from the ReplicationHandler that listens at /replication, used for recovery
+ * and leader actions. It is simpler to discard the entire polling ReplicationHandler rather then
+ * worrying about disabling polling and correctly setting all of the leader bits if we need to reset.
+ *
+ *
TODO: It may be cleaner to extract the polling logic use that directly instead of creating
+ * what might be a fairly heavyweight instance here.
+ *
* @param switchTransactionLog if true, ReplicationHandler will rotate the transaction log once
* the replication is done
*/
- public void startReplication(boolean switchTransactionLog) throws InterruptedException {
+ public void startReplication(boolean switchTransactionLog) {
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
if (cc.isShutDown()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 7daa6b16df6..0e06b401e75 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1302,7 +1302,7 @@ public class ZkController implements Closeable {
return replica;
}
- public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) throws InterruptedException {
+ public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) {
log.info("{} starting background replication from leader", coreName);
ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
synchronized (replicateFromLeader) { // synchronize to prevent any stop before we finish the start
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 6fb7b6026c6..e3489d02160 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1732,6 +1732,20 @@ public final class SolrCore implements SolrInfoBean, Closeable {
closeHooks.add(hook);
}
+ /**
+ * Remove a close callback hook
+ */
+ public void removeCloseHook(CloseHook hook) {
+ if (closeHooks != null) {
+ closeHooks.remove(hook);
+ }
+ }
+
+ // Visible for testing
+ public Collection getCloseHooks() {
+ return Collections.unmodifiableCollection(closeHooks);
+ }
+
/**
* @lucene.internal Debugging aid only. No non-test code should be released with uncommented verbose() calls.
*/
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index d814ab5a4b0..e41242966f7 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -1372,55 +1372,55 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
return Boolean.TRUE.equals( enable );
}
+ private final CloseHook startShutdownHook = new CloseHook() {
+ @Override
+ public void preClose(SolrCore core) {
+ if (executorService != null)
+ executorService.shutdown(); // we don't wait for shutdown - this can deadlock core reload
+ }
+
+ @Override
+ public void postClose(SolrCore core) {
+ if (pollingIndexFetcher != null) {
+ pollingIndexFetcher.destroy();
+ }
+ if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
+ currentIndexFetcher.destroy();
+ }
+ }
+ };
+ private final CloseHook finishShutdownHook = new CloseHook() {
+ @Override
+ public void preClose(SolrCore core) {
+ ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
+ if (restoreFuture != null) {
+ restoreFuture.cancel(false);
+ }
+ }
+
+ @Override
+ public void postClose(SolrCore core) {
+ }
+ };
+
/**
* register a closehook
*/
private void registerCloseHook() {
- core.addCloseHook(new CloseHook() {
- @Override
- public void preClose(SolrCore core) {
- if (executorService != null) executorService.shutdown(); // we don't wait for shutdown - this can deadlock core reload
- }
-
- @Override
- public void postClose(SolrCore core) {
- if (pollingIndexFetcher != null) {
- pollingIndexFetcher.destroy();
- }
- if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
- currentIndexFetcher.destroy();
- }
- }
- });
-
- core.addCloseHook(new CloseHook() {
- @Override
- public void preClose(SolrCore core) {
- ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
- if (restoreFuture != null) {
- restoreFuture.cancel(false);
- }
- }
-
- @Override
- public void postClose(SolrCore core) {}
- });
+ core.addCloseHook(startShutdownHook);
+ core.addCloseHook(finishShutdownHook);
}
public void shutdown() {
- if (executorService != null) executorService.shutdown();
- if (pollingIndexFetcher != null) {
- pollingIndexFetcher.destroy();
- }
- if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
- currentIndexFetcher.destroy();
- }
- ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
- if (restoreFuture != null) {
- restoreFuture.cancel(false);
- }
-
+ startShutdownHook.preClose(core);
+ startShutdownHook.postClose(core);
+ finishShutdownHook.preClose(core);
+ finishShutdownHook.postClose(core);
+
ExecutorUtil.shutdownAndAwaitTermination(executorService);
+
+ core.removeCloseHook(startShutdownHook);
+ core.removeCloseHook(finishShutdownHook);
}
/**
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
index 72a3246f1a2..4e5c39eee2c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
@@ -44,6 +44,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.core.SolrCore;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
@@ -219,11 +220,29 @@ public void testCantConnectToPullReplica() throws Exception {
addDocs(30);
waitForState("Expecting node to be disconnected", collectionName, activeReplicaCount(1, 0, 0));
addDocs(40);
- waitForState("Expecting node to be disconnected", collectionName, activeReplicaCount(1, 0, 1));
+ waitForState("Expecting node to be reconnected", collectionName, activeReplicaCount(1, 0, 1));
try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
assertNumDocs(40, pullReplicaClient);
}
}
+
+ public void testCloseHooksDeletedOnReconnect() throws Exception {
+ CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 1)
+ .process(cluster.getSolrClient());
+ addDocs(10);
+
+ DocCollection docCollection = assertNumberOfReplicas(1, 0, 1, false, true);
+ Slice s = docCollection.getSlices().iterator().next();
+ JettySolrRunner jetty = getJettyForReplica(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0));
+ SolrCore core = jetty.getCoreContainer().getCores().iterator().next();
+
+ for (int i = 0; i < 5; i++) {
+ cluster.expireZkSession(jetty);
+ waitForState("Expecting node to reconnect", collectionName, activeReplicaCount(1, 0, 1));
+ // We have two active ReplicationHandler with two close hooks each, one for triggering recovery and one for doing interval polling
+ assertEquals(5, core.getCloseHooks().size());
+ }
+ }
private void assertNumDocs(int numDocs, SolrClient client, int timeoutSecs) throws InterruptedException, SolrServerException, IOException {
TimeOut t = new TimeOut(timeoutSecs, TimeUnit.SECONDS, TimeSource.NANO_TIME);