diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 9053b5ec4a1..dbc6cecb1a7 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -352,6 +352,8 @@ Bug Fixes * SOLR-12110: Replica which failed to register in Zk can become leader (Cao Manh Dat) +* SOLR-12129: After the core is reloaded, term of the core will not be watched (Cao Manh Dat) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java index 90a500aec29..007d22147c8 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java @@ -20,6 +20,8 @@ package org.apache.solr.cloud; import java.lang.invoke.MethodHandles; import java.util.concurrent.atomic.AtomicLong; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.SolrCore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,32 +31,40 @@ import org.slf4j.LoggerFactory; */ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final SolrCore solrCore; + private final CoreDescriptor coreDescriptor; + private final CoreContainer coreContainer; // used to prevent the case when term of other replicas get changed, we redo recovery // the idea here is with a specific term of a replica, we only do recovery one private final AtomicLong lastTermDoRecovery; - RecoveringCoreTermWatcher(SolrCore solrCore) { - this.solrCore = solrCore; + RecoveringCoreTermWatcher(CoreDescriptor coreDescriptor, CoreContainer coreContainer) { + this.coreDescriptor = coreDescriptor; + this.coreContainer = coreContainer; this.lastTermDoRecovery = new AtomicLong(-1); } @Override public boolean onTermChanged(ZkShardTerms.Terms terms) { - if (solrCore.isClosed()) { + if (coreContainer.isShutDown()) return false; + + try (SolrCore solrCore = coreContainer.getCore(coreDescriptor.getName())) { + if (solrCore == null || solrCore.isClosed()) { + return false; + } + + if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true; + String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(); + if (terms.haveHighestTermValue(coreNodeName)) return true; + if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) { + log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName); + lastTermDoRecovery.set(terms.getTerm(coreNodeName)); + solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor()); + } + } catch (Exception e) { + log.info("Failed to watch term of core {}", coreDescriptor.getName(), e); return false; } - if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true; - - String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(); - if (terms.haveHighestTermValue(coreNodeName)) return true; - if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) { - log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName); - lastTermDoRecovery.set(terms.getTerm(coreNodeName)); - solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor()); - } - return true; } @@ -65,11 +75,11 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher { RecoveringCoreTermWatcher that = (RecoveringCoreTermWatcher) o; - return solrCore.equals(that.solrCore); + return coreDescriptor.getName().equals(that.coreDescriptor.getName()); } @Override public int hashCode() { - return solrCore.hashCode(); + return coreDescriptor.getName().hashCode(); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 7a908b28585..57f1dd55afe 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1138,7 +1138,7 @@ public class ZkController { } if (isRunningInNewLIR && replicaType != Type.PULL) { - shardTerms.addListener(new RecoveringCoreTermWatcher(core)); + shardTerms.addListener(new RecoveringCoreTermWatcher(core.getCoreDescriptor(), getCoreContainer())); } core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true); } diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java index ce454a7b599..60d7eb7cf2c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java @@ -40,6 +40,7 @@ import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrException; @@ -270,6 +271,9 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { createCollection(testCollectionName, "conf1", 1, 3, 1); cloudClient.setDefaultCollection(testCollectionName); + // term of the core still be watched even when the core is reloaded + CollectionAdminRequest.reloadCollection(testCollectionName).process(cloudClient); + sendDoc(1, 2); JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));