SOLR-12129: After the core is reloaded, term of the core will not be watched

This commit is contained in:
Cao Manh Dat 2018-03-21 09:35:51 +07:00
parent 9cda074e55
commit 63a145aa7b
4 changed files with 33 additions and 17 deletions

View File

@ -352,6 +352,8 @@ Bug Fixes
* SOLR-12110: Replica which failed to register in Zk can become leader (Cao Manh Dat)
* SOLR-12129: After the core is reloaded, term of the core will not be watched (Cao Manh Dat)
Optimizations
----------------------

View File

@ -20,6 +20,8 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,32 +31,40 @@ import org.slf4j.LoggerFactory;
*/
public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SolrCore solrCore;
private final CoreDescriptor coreDescriptor;
private final CoreContainer coreContainer;
// used to prevent the case when term of other replicas get changed, we redo recovery
// the idea here is with a specific term of a replica, we only do recovery one
private final AtomicLong lastTermDoRecovery;
RecoveringCoreTermWatcher(SolrCore solrCore) {
this.solrCore = solrCore;
RecoveringCoreTermWatcher(CoreDescriptor coreDescriptor, CoreContainer coreContainer) {
this.coreDescriptor = coreDescriptor;
this.coreContainer = coreContainer;
this.lastTermDoRecovery = new AtomicLong(-1);
}
@Override
public boolean onTermChanged(ZkShardTerms.Terms terms) {
if (solrCore.isClosed()) {
if (coreContainer.isShutDown()) return false;
try (SolrCore solrCore = coreContainer.getCore(coreDescriptor.getName())) {
if (solrCore == null || solrCore.isClosed()) {
return false;
}
if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
if (terms.haveHighestTermValue(coreNodeName)) return true;
if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
lastTermDoRecovery.set(terms.getTerm(coreNodeName));
solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
}
} catch (Exception e) {
log.info("Failed to watch term of core {}", coreDescriptor.getName(), e);
return false;
}
if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
if (terms.haveHighestTermValue(coreNodeName)) return true;
if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
lastTermDoRecovery.set(terms.getTerm(coreNodeName));
solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
}
return true;
}
@ -65,11 +75,11 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
RecoveringCoreTermWatcher that = (RecoveringCoreTermWatcher) o;
return solrCore.equals(that.solrCore);
return coreDescriptor.getName().equals(that.coreDescriptor.getName());
}
@Override
public int hashCode() {
return solrCore.hashCode();
return coreDescriptor.getName().hashCode();
}
}

View File

@ -1138,7 +1138,7 @@ public class ZkController {
}
if (isRunningInNewLIR && replicaType != Type.PULL) {
shardTerms.addListener(new RecoveringCoreTermWatcher(core));
shardTerms.addListener(new RecoveringCoreTermWatcher(core.getCoreDescriptor(), getCoreContainer()));
}
core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
}

View File

@ -40,6 +40,7 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
@ -270,6 +271,9 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
createCollection(testCollectionName, "conf1", 1, 3, 1);
cloudClient.setDefaultCollection(testCollectionName);
// term of the core still be watched even when the core is reloaded
CollectionAdminRequest.reloadCollection(testCollectionName).process(cloudClient);
sendDoc(1, 2);
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));