SOLR-12187: Replica should watch clusterstate and unload itself if its entry is removed

This commit is contained in:
Cao Manh Dat 2018-04-17 20:16:31 +07:00
parent f7f12a51f3
commit 09db13f4f4
8 changed files with 186 additions and 193 deletions

View File

@ -164,6 +164,8 @@ Bug Fixes
* SOLR-10169: PeerSync will hit an NPE on no response errors when looking for fingerprint. (Erick Erickson)
* SOLR-12187: Replica should watch clusterstate and unload itself if its entry is removed (Cao Manh Dat)
Optimizations
----------------------

View File

@ -38,6 +38,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
@ -65,6 +66,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.BeforeReconnect;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DefaultConnectionStrategy;
import org.apache.solr.common.cloud.DefaultZkACLProvider;
import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
@ -1033,42 +1035,39 @@ public class ZkController {
try {
// pre register has published our down state
final String baseUrl = getBaseUrl();
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
final String collection = cloudDesc.getCollectionName();
final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
final String shardId = cloudDesc.getShardId();
final String coreZkNodeName = cloudDesc.getCoreNodeName();
assert coreZkNodeName != null : "we should have a coreNodeName by now";
// check replica's existence in clusterstate first
try {
zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100,
TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
} catch (TimeoutException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
}
Replica replica = getReplicaOrNull(zkStateReader.getClusterState().getCollectionOrNull(collection), shardId, coreZkNodeName);
if (replica == null) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica is removed from clusterstate");
}
ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
// This flag is used for testing rolling updates and should be removed in SOLR-11812
boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
if (isRunningInNewLIR && cloudDesc.getReplicaType() != Type.PULL) {
if (isRunningInNewLIR && replica.getType() != Type.PULL) {
shardTerms.registerTerm(coreZkNodeName);
}
String shardId = cloudDesc.getShardId();
Map<String,Object> props = new HashMap<>();
// we only put a subset of props into the leader node
props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
props.put(ZkStateReader.CORE_NAME_PROP, coreName);
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
log.debug("Register replica - core:{} address:{} collection:{} shard:{}",
coreName, baseUrl, cloudDesc.getCollectionName(), shardId);
ZkNodeProps leaderProps = new ZkNodeProps(props);
coreName, baseUrl, collection, shardId);
try {
// If we're a preferred leader, insert ourselves at the head of the queue
boolean joinAtHead = false;
final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
Replica replica = (docCollection == null) ? null : docCollection.getReplica(coreZkNodeName);
if (replica != null) {
joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
}
//TODO WHy would replica be null?
if (replica == null || replica.getType() != Type.PULL) {
boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
if (replica.getType() != Type.PULL) {
joinElection(desc, afterExpiration, joinAtHead);
} else if (replica.getType() == Type.PULL) {
if (joinAtHead) {
@ -1093,9 +1092,8 @@ public class ZkController {
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
log.debug("We are " + ourUrl + " and leader is " + leaderUrl);
boolean isLeader = leaderUrl.equals(ourUrl);
Replica.Type replicaType = zkStateReader.getClusterState().getCollection(collection).getReplica(coreZkNodeName).getType();
assert !(isLeader && replicaType == Type.PULL): "Pull replica became leader!";
assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!";
try (SolrCore core = cc.getCore(desc.getName())) {
// recover from local transaction log and wait for it to complete before
@ -1105,7 +1103,7 @@ public class ZkController {
// leader election perhaps?
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
boolean isTlogReplicaAndNotLeader = replicaType == Replica.Type.TLOG && !isLeader;
boolean isTlogReplicaAndNotLeader = replica.getType() == Replica.Type.TLOG && !isLeader;
if (isTlogReplicaAndNotLeader) {
String commitVersion = ReplicateFromLeader.getCommitVersion(core);
if (commitVersion != null) {
@ -1138,23 +1136,40 @@ public class ZkController {
publish(desc, Replica.State.ACTIVE);
}
if (isRunningInNewLIR && replicaType != Type.PULL) {
if (isRunningInNewLIR && replica.getType() != Type.PULL) {
// the watcher is added to a set so multiple calls of this method will left only one watcher
shardTerms.addListener(new RecoveringCoreTermWatcher(core.getCoreDescriptor(), getCoreContainer()));
}
core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
} catch (Exception e) {
unregister(coreName, desc, false);
throw e;
}
// make sure we have an update cluster state right away
zkStateReader.forceUpdateCollection(collection);
// the watcher is added to a set so multiple calls of this method will left only one watcher
zkStateReader.registerCollectionStateWatcher(cloudDesc.getCollectionName(),
new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName()));
return shardId;
} catch (Exception e) {
unregister(coreName, desc, false);
throw e;
} finally {
MDCLoggingContext.clear();
}
}
private Replica getReplicaOrNull(DocCollection docCollection, String shard, String coreNodeName) {
if (docCollection == null) return null;
Slice slice = docCollection.getSlice(shard);
if (slice == null) return null;
Replica replica = slice.getReplica(coreNodeName);
if (replica == null) return null;
if (!getNodeName().equals(replica.getNodeName())) return null;
return replica;
}
public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) throws InterruptedException {
log.info("{} starting background replication from leader", coreName);
ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
@ -1359,11 +1374,7 @@ public class ZkController {
}
public void publish(final CoreDescriptor cd, final Replica.State state) throws Exception {
publish(cd, state, true);
}
public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws Exception {
publish(cd, state, updateLastState, false);
publish(cd, state, true, false);
}
/**
@ -1430,6 +1441,9 @@ public class ZkController {
props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
props.put(ZkStateReader.COLLECTION_PROP, collection);
props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
if (!Overseer.isLegacy(zkStateReader)) {
props.put(ZkStateReader.FORCE_SET_STATE_PROP, "false");
}
if (numShards != null) {
props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
}
@ -1521,7 +1535,6 @@ public class ZkController {
}
}
CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
zkStateReader.unregisterCore(cloudDescriptor.getCollectionName());
if (removeCoreFromZk) {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
@ -1653,7 +1666,6 @@ public class ZkController {
"Collection {} not visible yet, but flagging it so a watch is registered when it becomes visible" :
"Registering watch for collection {}",
collectionName);
zkStateReader.registerCore(collectionName);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
@ -2707,6 +2719,56 @@ public class ZkController {
};
}
private class UnloadCoreOnDeletedWatcher implements CollectionStateWatcher {
String coreNodeName;
String shard;
String coreName;
public UnloadCoreOnDeletedWatcher(String coreNodeName, String shard, String coreName) {
this.coreNodeName = coreNodeName;
this.shard = shard;
this.coreName = coreName;
}
@Override
// synchronized due to SOLR-11535
public synchronized boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
if (getCoreContainer().getCoreDescriptor(coreName) == null) return true;
boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null;
if (replicaRemoved) {
try {
log.info("Replica {} removed from clusterstate, remove it.", coreName);
getCoreContainer().unload(coreName, true, true, true);
} catch (SolrException e) {
if (!e.getMessage().contains("Cannot unload non-existent core")) {
// no need to log if the core was already unloaded
log.warn("Failed to unregister core:{}", coreName, e);
}
} catch (Exception e) {
log.warn("Failed to unregister core:{}", coreName, e);
}
}
return replicaRemoved;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UnloadCoreOnDeletedWatcher that = (UnloadCoreOnDeletedWatcher) o;
return Objects.equals(coreNodeName, that.coreNodeName) &&
Objects.equals(shard, that.shard) &&
Objects.equals(coreName, that.coreName);
}
@Override
public int hashCode() {
return Objects.hash(coreNodeName, shard, coreName);
}
}
/**
* Thrown during leader initiated recovery process if current node is not leader
*/

View File

@ -222,22 +222,6 @@ public class ZkContainer {
public ZkController getZkController() {
return zkController;
}
public void publishCoresAsDown(List<SolrCore> cores) {
for (SolrCore core : cores) {
try {
zkController.publish(core.getCoreDescriptor(), Replica.State.DOWN);
} catch (KeeperException e) {
ZkContainer.log.error("", e);
} catch (InterruptedException e) {
Thread.interrupted();
ZkContainer.log.error("", e);
} catch (Exception e) {
ZkContainer.log.error("", e);
}
}
}
public void close() {

View File

@ -40,7 +40,6 @@ import org.apache.solr.api.Api;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
@ -282,7 +281,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
* In SOLR-11739 we change the way the async IDs are checked to decide if one has
* already been used or not. For backward compatibility, we continue to check in the
* old way (meaning, in all the queues) for now. This extra check should be removed
* in Solr 9
* in Solr 9
*/
private static final boolean CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS = true;
@ -306,7 +305,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
NamedList<String> r = new NamedList<>();
if (CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS && (
coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
@ -1162,26 +1161,15 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
// Wait till we have an active leader
boolean success = false;
for (int i = 0; i < 10; i++) {
ZkCoreNodeProps zombieLeaderProps = getZombieLeader(zkController, collectionName, sliceId);
if (zombieLeaderProps != null) {
log.warn("A replica {} on node {} won the leader election, but not exist in clusterstate, " +
"remove it and waiting for another round of election",
zombieLeaderProps.getCoreName(), zombieLeaderProps.getNodeName());
try (HttpSolrClient solrClient = new HttpSolrClient.Builder(zombieLeaderProps.getBaseUrl()).build()) {
CoreAdminRequest.unloadCore(zombieLeaderProps.getCoreName(), solrClient);
}
// waiting for another election round
i = 0;
}
clusterState = zkController.getClusterState();
for (int i = 0; i < 9; i++) {
Thread.sleep(5000);
clusterState = handler.coreContainer.getZkController().getClusterState();
collection = clusterState.getCollection(collectionName);
slice = collection.getSlice(sliceId);
if (slice.getLeader() != null && slice.getLeader().getState() == State.ACTIVE) {
success = true;
break;
}
Thread.sleep(5000);
log.warn("Force leader attempt {}. Waiting 5 secs for an active leader. State of the slice: {}", (i + 1), slice);
}
@ -1198,25 +1186,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
}
/**
* Zombie leader is a replica won the election but does not exist in clusterstate
* @return null if the zombie leader does not exist
*/
private static ZkCoreNodeProps getZombieLeader(ZkController zkController, String collection, String shardId) {
try {
ZkCoreNodeProps leaderProps = zkController.getLeaderProps(collection, shardId, 1000);
DocCollection docCollection = zkController.getClusterState().getCollection(collection);
Replica replica = docCollection.getReplica(leaderProps.getNodeProps().getStr(ZkStateReader.CORE_NODE_NAME_PROP));
if (replica == null) return leaderProps;
if (!replica.getNodeName().equals(leaderProps.getNodeName())) {
return leaderProps;
}
return null;
} catch (Exception e) {
return null;
}
}
public static void waitForActiveCollection(String collectionName, CoreContainer cc, SolrResponse createCollResponse)
throws KeeperException, InterruptedException {

View File

@ -22,6 +22,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -34,11 +35,13 @@ import org.apache.solr.client.solrj.request.CoreStatus;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZkStateReaderAccessor;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.ZkContainer;
@ -86,12 +89,17 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
assertTrue("Unexpected error message: " + e.getMessage(), e.getMessage().contains("state is 'active'"));
assertTrue("Data directory for " + replica.getName() + " should not have been deleted", Files.exists(dataDir));
JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName())
.process(cluster.getSolrClient());
waitForState("Expected replica " + replica.getName() + " to have been removed", collectionName, (n, c) -> {
Slice testShard = c.getSlice(shard.getName());
return testShard.getReplica(replica.getName()) == null;
});
// the core no longer watch collection state since it was removed
assertEquals(watchers.size() - 1, accessor.getStateWatchers(collectionName).size());
assertFalse("Data directory for " + replica.getName() + " should have been removed", Files.exists(dataDir));
@ -165,8 +173,63 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
}
@Test
public void deleteReplicaFromClusterState() throws Exception {
deleteReplicaFromClusterState("true");
deleteReplicaFromClusterState("false");
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
}
public void deleteReplicaFromClusterState(String legacyCloud) throws Exception {
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient());
final String collectionName = "deleteFromClusterState_"+legacyCloud;
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 3)
.process(cluster.getSolrClient());
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1"));
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2"));
cluster.getSolrClient().commit(collectionName);
Slice shard = getCollectionState(collectionName).getSlice("shard1");
Replica replica = getRandomReplica(shard);
JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
ZkStateReader.NODE_NAME_PROP, replica.getNodeName(),
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.CORE_NODE_NAME_PROP, replica.getName(),
ZkStateReader.BASE_URL_PROP, replica.getBaseUrl());
Overseer.getStateUpdateQueue(cluster.getZkClient()).offer(Utils.toJSON(m));
waitForState("Timeout waiting for replica get deleted", collectionName,
(liveNodes, collectionState) -> collectionState.getSlice("shard1").getReplicas().size() == 2);
TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
timeOut.waitFor("Waiting for replica get unloaded", () ->
replicaJetty.getCoreContainer().getCoreDescriptor(replica.getCoreName()) == null
);
// the core no longer watch collection state since it was removed
timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
timeOut.waitFor("Waiting for watcher get removed", () ->
watchers.size() - 1 == accessor.getStateWatchers(collectionName).size()
);
CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
}
@Test
@Slow
public void raceConditionOnDeleteAndRegisterReplica() throws Exception {
final String collectionName = "raceDeleteReplica";
raceConditionOnDeleteAndRegisterReplica("true");
raceConditionOnDeleteAndRegisterReplica("false");
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
}
public void raceConditionOnDeleteAndRegisterReplica(String legacyCloud) throws Exception {
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient());
final String collectionName = "raceDeleteReplica_"+legacyCloud;
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
.process(cluster.getSolrClient());
waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2));
@ -246,15 +309,16 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
ZkContainer.testing_beforeRegisterInZk = null;
}
waitForState("Timeout for replica:"+replica1.getName()+" register itself as DOWN after failed to register", collectionName, (liveNodes, collectionState) -> {
Slice shard = collectionState.getSlice("shard1");
Replica replica = shard.getReplica(replica1.getName());
return replica != null && replica.getState() == DOWN;
});
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
.process(cluster.getSolrClient());
while (true) {
try {
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
.process(cluster.getSolrClient());
break;
} catch (Exception e) {
// expected, when the node is not fully started
Thread.sleep(500);
}
}
waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2));
String leaderJettyNodeName = leaderJetty.getNodeName();

View File

@ -62,81 +62,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
/**
* Tests that FORCELEADER can get an active leader even in the case there are a replica won the election but not present in clusterstate
*/
@Test
@Slow
public void testZombieLeader() throws Exception {
String testCollectionName = "forceleader_zombie_leader_collection";
createCollection(testCollectionName, "conf1", 1, 3, 1);
cloudClient.setDefaultCollection(testCollectionName);
try {
List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
assertEquals("Expected 2 replicas for collection " + testCollectionName
+ " but found " + notLeaders.size() + "; clusterState: "
+ printClusterStateInfo(testCollectionName), 2, notLeaders.size());
List<JettySolrRunner> notLeaderJetties = notLeaders.stream().map(rep -> getJettyOnPort(getReplicaPort(rep)))
.collect(Collectors.toList());
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
// remove leader from clusterstate
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
ZkStateReader.CORE_NAME_PROP, leader.getCoreName(),
ZkStateReader.NODE_NAME_PROP, leader.getNodeName(),
ZkStateReader.COLLECTION_PROP, testCollectionName,
ZkStateReader.CORE_NODE_NAME_PROP, leader.getName(),
ZkStateReader.BASE_URL_PROP, leader.getBaseUrl());
Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()).offer(Utils.toJSON(m));
boolean restartOtherReplicas = random().nextBoolean();
log.info("Starting test with restartOtherReplicas:{}", restartOtherReplicas);
if (restartOtherReplicas) {
for (JettySolrRunner notLeaderJetty : notLeaderJetties) {
notLeaderJetty.stop();
}
}
cloudClient.waitForState(testCollectionName, 30, TimeUnit.SECONDS,
(liveNodes, collectionState) -> collectionState.getReplicas().size() == 2);
if (restartOtherReplicas) {
for (JettySolrRunner notLeaderJetty : notLeaderJetties) {
notLeaderJetty.start();
}
}
log.info("Before forcing leader: " + cloudClient.getZkStateReader().getClusterState()
.getCollection(testCollectionName).getSlice(SHARD1));
doForceLeader(cloudClient, testCollectionName, SHARD1);
// By now we have an active leader. Wait for recoveries to begin
waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1));
assertNull("Expected zombie leader get deleted", leaderJetty.getCoreContainer().getCore(leader.getCoreName()));
Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
assertNotNull(newLeader);
assertEquals(State.ACTIVE, newLeader.getState());
int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
assertEquals(2, numActiveReplicas);
// Assert that indexing works again
sendDoc(1);
cloudClient.commit();
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
} finally {
log.info("Cleaning up after the test.");
// try to clean up
attemptCollectionDelete(cloudClient, testCollectionName);
}
}
/**
* Tests that FORCELEADER can get an active leader even only replicas with term lower than leader's term are live
*/

View File

@ -60,9 +60,6 @@ import org.slf4j.LoggerFactory;
public class MoveReplicaTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static ZkStateReaderAccessor accessor;
private static int overseerLeaderIndex;
// used by MoveReplicaHDFSTest
protected boolean inPlaceMove = true;
@ -78,14 +75,12 @@ public class MoveReplicaTest extends SolrCloudTestCase {
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
if (jetty.getNodeName().equals(overseerLeader)) {
overseerJetty = jetty;
overseerLeaderIndex = i;
break;
}
}
if (overseerJetty == null) {
fail("no overseer leader!");
}
accessor = new ZkStateReaderAccessor(overseerJetty.getCoreContainer().getZkController().getZkStateReader());
}
protected String getSolrXml() {
@ -137,8 +132,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
}
}
Set<CollectionStateWatcher> watchers = new HashSet<>(accessor.getStateWatchers(coll));
int sourceNumCores = getNumOfCores(cloudClient, replica.getNodeName(), coll);
int targetNumCores = getNumOfCores(cloudClient, targetNode, coll);
@ -201,9 +194,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
Set<CollectionStateWatcher> newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
assertEquals(watchers, newWatchers);
moveReplica = createMoveReplicaRequest(coll, replica, targetNode, shardId);
moveReplica.setInPlaceMove(inPlaceMove);
moveReplica.process(cloudClient);
@ -243,8 +233,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
}
}
assertTrue("replica never fully recovered", recovered);
newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
assertEquals(watchers, newWatchers);
assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
}
@ -258,8 +246,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
CloudSolrClient cloudClient = cluster.getSolrClient();
Set<CollectionStateWatcher> watchers = new HashSet<>(accessor.getStateWatchers(coll));
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION);
create.setAutoAddReplicas(false);
cloudClient.request(create);
@ -303,9 +289,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
}
assertFalse(success);
Set<CollectionStateWatcher> newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
assertEquals(watchers, newWatchers);
log.info("--- current collection state: " + cloudClient.getZkStateReader().getClusterState().getCollection(coll));
assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
}

View File

@ -1572,8 +1572,12 @@ public class ZkStateReader implements Closeable {
return v;
});
for (CollectionStateWatcher watcher : watchers) {
if (watcher.onStateChanged(liveNodes, collectionState)) {
removeCollectionStateWatcher(collection, watcher);
try {
if (watcher.onStateChanged(liveNodes, collectionState)) {
removeCollectionStateWatcher(collection, watcher);
}
} catch (Throwable throwable) {
LOG.warn("Error on calling watcher", throwable);
}
}
}