SOLR-11702: Redesign current LIR implementation

This commit is contained in:
Cao Manh Dat 2018-01-29 15:55:28 +07:00
parent 00d453d27c
commit 27ef653064
23 changed files with 1936 additions and 227 deletions

View File

@ -71,6 +71,10 @@ Upgrade Notes
Before 7.3, the copied over configset was named the same as the collection name, but 7.3 onwards it will be named
with an additional ".AUTOCREATED" suffix.
* SOLR-11702: The old LIR implementation (SOLR-5495) is now deprecated and replaced.
Solr will support rolling upgrades from old 7.x versions of Solr to the new one until
the last release of the 7.x major version.
New Features
----------------------
* SOLR-11285: Simulation framework for autoscaling. (ab)
@ -113,6 +117,8 @@ New Features
* SOLR-11617: Alias metadata is now mutable via a new MODIFYALIAS command. Metadata is returned from LISTALIASES.
(Gus Heck via David Smiley)
* SOLR-11702: Redesign current LIR implementation (Cao Manh Dat, shalin)
Bug Fixes
----------------------

View File

@ -84,7 +84,7 @@ public class JettySolrRunner {
FilterHolder debugFilter;
private boolean waitOnSolr = false;
private int lastPort = -1;
private int jettyPort = -1;
private final JettyConfig config;
private final String solrHome;
@ -280,8 +280,10 @@ public class JettySolrRunner {
@Override
public void lifeCycleStarted(LifeCycle arg0) {
lastPort = getFirstConnectorPort();
nodeProperties.setProperty("hostPort", Integer.toString(lastPort));
jettyPort = getFirstConnectorPort();
int port = jettyPort;
if (proxyPort != -1) port = proxyPort;
nodeProperties.setProperty("hostPort", Integer.toString(port));
nodeProperties.setProperty("hostContext", config.context);
root.getServletContext().setAttribute(SolrDispatchFilter.PROPERTIES_ATTRIBUTE, nodeProperties);
@ -384,7 +386,7 @@ public class JettySolrRunner {
// if started before, make a new server
if (startedBefore) {
waitOnSolr = false;
int port = reusePort ? lastPort : this.config.port;
int port = reusePort ? jettyPort : this.config.port;
init(port);
} else {
startedBefore = true;
@ -456,7 +458,7 @@ public class JettySolrRunner {
if (0 == conns.length) {
throw new RuntimeException("Jetty Server has no Connectors");
}
return (proxyPort != -1) ? proxyPort : ((ServerConnector) conns[0]).getLocalPort();
return ((ServerConnector) conns[0]).getLocalPort();
}
/**
@ -465,10 +467,10 @@ public class JettySolrRunner {
* @exception RuntimeException if there is no Connector
*/
public int getLocalPort() {
if (lastPort == -1) {
if (jettyPort == -1) {
throw new IllegalStateException("You cannot get the port until this instance has started");
}
return (proxyPort != -1) ? proxyPort : lastPort;
return (proxyPort != -1) ? proxyPort : jettyPort;
}
/**

View File

@ -20,6 +20,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Future;
@ -491,7 +492,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
rejoinLeaderElection(core);
}
}
if (isLeader) {
// check for any replicas in my shard that were set to down by the previous leader
try {
@ -530,6 +531,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
return docCollection.getReplica(replicaName);
}
@Deprecated
public void checkLIR(String coreName, boolean allReplicasInLine)
throws InterruptedException, KeeperException, IOException {
if (allReplicasInLine) {
@ -551,7 +553,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP), Replica.State.ACTIVE, core.getCoreDescriptor(), true);
}
}
} else {
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
@ -567,7 +569,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
}
}
@Deprecated
private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
try (SolrCore core = cc.getCore(coreName)) {
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
@ -577,10 +580,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
if (coll == null || shardId == null) {
log.error("Cannot start leader-initiated recovery on new leader (core="+
coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
return;
}
String znodePath = zkController.getLeaderInitiatedRecoveryZnodePath(coll, shardId);
List<String> replicas = null;
try {
@ -588,21 +591,28 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
} catch (NoNodeException nne) {
// this can be ignored
}
if (replicas != null && replicas.size() > 0) {
for (String replicaCoreNodeName : replicas) {
if (coreNodeName.equals(replicaCoreNodeName))
continue; // added safe-guard so we don't mark this core as down
if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
// the replica registered its term so it is running with the new LIR implementation
// we can put this replica into recovery by increase our terms
zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, Collections.singleton(replicaCoreNodeName));
continue;
}
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) {
log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: "
+ lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
List<ZkCoreNodeProps> replicaProps =
List<ZkCoreNodeProps> replicaProps =
zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName);
if (replicaProps != null && replicaProps.size() > 0) {
if (replicaProps != null && replicaProps.size() > 0) {
ZkCoreNodeProps coreNodeProps = null;
for (ZkCoreNodeProps p : replicaProps) {
if (((Replica)p.getNodeProps()).getName().equals(replicaCoreNodeName)) {
@ -610,17 +620,18 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
break;
}
}
zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
collection, shardId, coreNodeProps, core.getCoreDescriptor(),
false /* forcePublishState */);
}
}
}
}
}
} // core gets closed automagically
} // core gets closed automagically
}
// returns true if all replicas are found to be up, false if not
private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
@ -743,7 +754,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
// to make sure others participate in sync and leader election, we can be leader
return true;
}
String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
&& !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
return false;
}
if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) {
log.debug("My last published State was Active, it's okay to be the leader.");
return true;

View File

@ -45,6 +45,7 @@ import java.util.List;
* replica; used by a shard leader to nag a replica into recovering after the
* leader experiences an error trying to send an update request to the replica.
*/
@Deprecated
public class LeaderInitiatedRecoveryThread extends Thread {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.core.SolrCore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Start recovery of a core if its term is less than leader's term
*/
public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SolrCore solrCore;
// used to prevent the case when term of other replicas get changed, we redo recovery
// the idea here is with a specific term of a replica, we only do recovery one
private final AtomicLong lastTermDoRecovery;
RecoveringCoreTermWatcher(SolrCore solrCore) {
this.solrCore = solrCore;
this.lastTermDoRecovery = new AtomicLong(-1);
}
@Override
public boolean onTermChanged(ZkShardTerms.Terms terms) {
if (solrCore.isClosed()) {
return false;
}
if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
if (terms.canBecomeLeader(coreNodeName)) return true;
if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
lastTermDoRecovery.set(terms.getTerm(coreNodeName));
solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
}
return true;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RecoveringCoreTermWatcher that = (RecoveringCoreTermWatcher) o;
return solrCore.equals(that.solrCore);
}
@Override
public int hashCode() {
return solrCore.hashCode();
}
}

View File

@ -35,8 +35,10 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@ -458,7 +460,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
core.getCoreDescriptor());
return;
}
// we temporary ignore peersync for tlog replicas
boolean firstTime = replicaType != Replica.Type.TLOG;
@ -516,21 +518,18 @@ public class RecoveryStrategy implements Runnable, Closeable {
zkController.stopReplicationFromLeader(coreName);
}
final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
Future<RecoveryInfo> replayFuture = null;
while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
try {
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
cloudDesc.getCollectionName(), cloudDesc.getShardId());
final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
final Replica leader = pingLeader(ourUrl, core.getCoreDescriptor(), true);
if (isClosed()) {
LOG.info("RecoveryStrategy has been closed");
break;
}
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
boolean isLeader = leaderUrl.equals(ourUrl);
boolean isLeader = leader.getCoreUrl().equals(ourUrl);
if (isLeader && !cloudDesc.isLeader()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
}
@ -541,12 +540,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
return;
}
LOG.info("Begin buffering updates. core=[{}]", coreName);
ulog.bufferUpdates();
replayed = false;
LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
ourUrl);
zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
@ -565,7 +564,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
break;
}
sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
if (isClosed()) {
LOG.info("RecoveryStrategy has been closed");
@ -585,11 +584,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
// first thing we just try to sync
if (firstTime) {
firstTime = false; // only try sync the first time through the loop
LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
PeerSync peerSync = new PeerSync(core,
Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
Collections.singletonList(leader.getCoreUrl()), ulog.getNumRecordsToKeep(), false, false);
peerSync.setStartingVersions(recentVersions);
boolean syncSuccess = peerSync.sync().isSuccess();
if (syncSuccess) {
@ -623,7 +622,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
replicate(zkController.getNodeName(), core, leaderprops);
replicate(zkController.getNodeName(), core, leader);
if (isClosed()) {
LOG.info("RecoveryStrategy has been closed");
@ -745,6 +744,48 @@ public class RecoveryStrategy implements Runnable, Closeable {
LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
}
private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
int numTried = 0;
while (true) {
CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
if (mayPutReplicaAsDown && numTried == 1 &&
docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
// this operation may take a long time, by putting replica into DOWN state, client won't query this replica
zkController.publish(coreDesc, Replica.State.DOWN);
}
numTried++;
final Replica leaderReplica = zkStateReader.getLeaderRetry(
cloudDesc.getCollectionName(), cloudDesc.getShardId());
if (isClosed()) {
return leaderReplica;
}
if (leaderReplica.getCoreUrl().equals(ourUrl)) {
return leaderReplica;
}
try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl())
.withSocketTimeout(1000)
.withConnectionTimeout(1000)
.build()) {
SolrPingResponse resp = httpSolrClient.ping();
return leaderReplica;
} catch (IOException e) {
LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
Thread.sleep(500);
} catch (Exception e) {
if (e.getCause() instanceof IOException) {
LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
Thread.sleep(500);
} else {
return leaderReplica;
}
}
}
}
public static Runnable testing_beforeReplayBufferingUpdates;
final private Future<RecoveryInfo> replay(SolrCore core)

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.util.HashMap;
import java.util.Map;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.core.CoreDescriptor;
/**
* Used to manage all ZkShardTerms of a collection
*/
class ZkCollectionTerms implements AutoCloseable {
private final String collection;
private final Map<String, ZkShardTerms> terms;
private final SolrZkClient zkClient;
ZkCollectionTerms(String collection, SolrZkClient client) {
this.collection = collection;
this.terms = new HashMap<>();
this.zkClient = client;
ObjectReleaseTracker.track(this);
}
public ZkShardTerms getShard(String shardId) {
synchronized (terms) {
if (!terms.containsKey(shardId)) terms.put(shardId, new ZkShardTerms(collection, shardId, zkClient));
return terms.get(shardId);
}
}
public void remove(String shardId, CoreDescriptor coreDescriptor) {
synchronized (terms) {
if (getShard(shardId).removeTerm(coreDescriptor)) {
terms.remove(shardId).close();
}
}
}
public void close() {
synchronized (terms) {
terms.values().forEach(ZkShardTerms::close);
}
ObjectReleaseTracker.release(this);
}
}

View File

@ -193,7 +193,6 @@ public class ZkController {
private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
private final SolrZkClient zkClient;
private final ZkCmdExecutor cmdExecutor;
public final ZkStateReader zkStateReader;
private SolrCloudManager cloudManager;
private CloudSolrClient cloudSolrClient;
@ -210,6 +209,7 @@ public class ZkController {
private LeaderElector overseerElector;
private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
private final Map<String, ZkCollectionTerms> collectionToTerms = new HashMap<>();
// for now, this can be null in tests, in which case recovery will be inactive, and other features
// may accept defaults or use mocks rather than pulling things from a CoreContainer
@ -226,6 +226,7 @@ public class ZkController {
private volatile boolean isClosed;
@Deprecated
// keeps track of replicas that have been asked to recover by leaders running on this node
private final Map<String, String> replicasInLeaderInitiatedRecovery = new HashMap<String, String>();
@ -323,7 +324,7 @@ public class ZkController {
@Override
public void command() {
log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
clearZkCollectionTerms();
try {
zkStateReader.createClusterStateWatchersAndUpdate();
@ -435,7 +436,6 @@ public class ZkController {
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
cmdExecutor = new ZkCmdExecutor(clientTimeout);
zkStateReader = new ZkStateReader(zkClient, () -> {
if (cc != null) cc.securityNodeChanged();
});
@ -547,6 +547,9 @@ public class ZkController {
*/
public void close() {
this.isClosed = true;
synchronized (collectionToTerms) {
collectionToTerms.values().forEach(ZkCollectionTerms::close);
}
try {
for (ElectionContext context : electionContexts.values()) {
try {
@ -1034,7 +1037,14 @@ public class ZkController {
final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
assert coreZkNodeName != null : "we should have a coreNodeName by now";
ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
// This flag is used for testing rolling updates and should be removed in SOLR-11812
boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
if (isRunningInNewLIR) {
shardTerms.registerTerm(coreZkNodeName);
}
String shardId = cloudDesc.getShardId();
Map<String,Object> props = new HashMap<>();
// we only put a subset of props into the leader node
@ -1118,15 +1128,17 @@ public class ZkController {
}
}
boolean didRecovery
= checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
= checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, shardId, core, cc, afterExpiration);
if (!didRecovery) {
if (isTlogReplicaAndNotLeader) {
startReplicationFromLeader(coreName, true);
}
publish(desc, Replica.State.ACTIVE);
}
if (isRunningInNewLIR && replicaType != Type.PULL) {
shardTerms.addListener(new RecoveringCoreTermWatcher(core));
}
core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
}
@ -1295,7 +1307,7 @@ public class ZkController {
* Returns whether or not a recovery was started
*/
private boolean checkRecovery(boolean recoverReloadedCores, final boolean isLeader, boolean skipRecovery,
final String collection, String shardId,
final String collection, String coreZkNodeName, String shardId,
SolrCore core, CoreContainer cc, boolean afterExpiration) {
if (SKIP_AUTO_RECOVERY) {
log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
@ -1322,6 +1334,13 @@ public class ZkController {
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
return true;
}
ZkShardTerms zkShardTerms = getShardTerms(collection, shardId);
if (zkShardTerms.registered(coreZkNodeName) && !zkShardTerms.canBecomeLeader(coreZkNodeName)) {
log.info("Leader's term larger than core " + core.getName() + "; starting recovery process");
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
return true;
}
} else {
log.info("I am the leader, no recovery necessary");
}
@ -1372,6 +1391,7 @@ public class ZkController {
String shardId = cd.getCloudDescriptor().getShardId();
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
// If the leader initiated recovery, then verify that this replica has performed
// recovery as requested before becoming active; don't even look at lirState if going down
if (state != Replica.State.DOWN) {
@ -1394,7 +1414,7 @@ public class ZkController {
}
}
}
Map<String,Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, "state");
props.put(ZkStateReader.STATE_PROP, state.toString());
@ -1430,6 +1450,11 @@ public class ZkController {
log.info("The core '{}' had failed to initialize before.", cd.getName());
}
// This flag is used for testing rolling updates and should be removed in SOLR-11812
boolean isRunningInNewLIR = "new".equals(cd.getCoreProperty("lirVersion", "new"));
if (state == Replica.State.RECOVERING && isRunningInNewLIR) {
getShardTerms(collection, shardId).setEqualsToMax(coreNodeName);
}
ZkNodeProps m = new ZkNodeProps(props);
if (updateLastState) {
@ -1441,23 +1466,28 @@ public class ZkController {
}
}
private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
final ClusterState state, final String coreNodeName) {
public ZkShardTerms getShardTerms(String collection, String shardId) {
return getCollectionTerms(collection).getShard(shardId);
}
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
final String shardId = state.getShardId(getNodeName(), desc.getName());
if (shardId != null) {
cloudDesc.setShardId(shardId);
return false;
private ZkCollectionTerms getCollectionTerms(String collection) {
synchronized (collectionToTerms) {
if (!collectionToTerms.containsKey(collection)) collectionToTerms.put(collection, new ZkCollectionTerms(collection, zkClient));
return collectionToTerms.get(collection);
}
}
public void clearZkCollectionTerms() {
synchronized (collectionToTerms) {
collectionToTerms.values().forEach(ZkCollectionTerms::close);
collectionToTerms.clear();
}
return true;
}
public void unregister(String coreName, CoreDescriptor cd) throws Exception {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
final String collection = cd.getCloudDescriptor().getCollectionName();
getCollectionTerms(collection).remove(cd.getCloudDescriptor().getShardId(), cd);
if (Strings.isNullOrEmpty(collection)) {
log.error("No collection was specified.");
@ -1733,7 +1763,7 @@ public class ZkController {
boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
if (!isLeader && !SKIP_AUTO_RECOVERY) {
// detect if this core is in leader-initiated recovery and if so,
// detect if this core is in leader-initiated recovery and if so,
// then we don't need the leader to wait on seeing the down state
Replica.State lirState = null;
try {
@ -1743,9 +1773,9 @@ public class ZkController {
" is in leader-initiated recovery due to: " + exc, exc);
}
if (lirState != null) {
log.debug("Replica " + myCoreNodeName +
" is already in leader-initiated recovery, so not waiting for leader to see down state.");
if (lirState != null || !getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) {
log.debug("Term of replica " + myCoreNodeName +
" is already less than leader, so not waiting for leader to see down state.");
} else {
log.info("Replica " + myCoreNodeName +
@ -2055,6 +2085,7 @@ public class ZkController {
* false means the node is not live either, so no point in trying to send recovery commands
* to it.
*/
@Deprecated
public boolean ensureReplicaInLeaderInitiatedRecovery(
final CoreContainer container,
final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
@ -2117,13 +2148,14 @@ public class ZkController {
" is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
replicaCoreProps.getCoreName(), replicaCoreNodeName);
// publishDownState will be false to avoid publishing the "down" state too many times
// as many errors can occur together and will each call into this method (SOLR-6189)
// as many errors can occur together and will each call into this method (SOLR-6189)
}
}
return nodeIsLive;
}
@Deprecated
public boolean isReplicaInRecoveryHandling(String replicaUrl) {
boolean exists = false;
synchronized (replicasInLeaderInitiatedRecovery) {
@ -2132,12 +2164,14 @@ public class ZkController {
return exists;
}
@Deprecated
public void removeReplicaFromLeaderInitiatedRecoveryHandling(String replicaUrl) {
synchronized (replicasInLeaderInitiatedRecovery) {
replicasInLeaderInitiatedRecovery.remove(replicaUrl);
}
}
@Deprecated
public Replica.State getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
final Map<String, Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
if (stateObj == null) {
@ -2147,6 +2181,7 @@ public class ZkController {
return stateStr == null ? null : Replica.State.getState(stateStr);
}
@Deprecated
public Map<String, Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
if (collection == null || shardId == null || coreNodeName == null)
@ -2191,6 +2226,7 @@ public class ZkController {
return stateObj;
}
@Deprecated
public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
Replica.State state, CoreDescriptor leaderCd, boolean retryOnConnLoss) {
if (collection == null || shardId == null || coreNodeName == null) {
@ -2199,12 +2235,12 @@ public class ZkController {
+ "; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
return; // if we don't have complete data about a core in cloud mode, do nothing
}
assert leaderCd != null;
assert leaderCd.getCloudDescriptor() != null;
String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
if (state == Replica.State.ACTIVE) {
@ -2269,29 +2305,29 @@ public class ZkController {
private void markShardAsDownIfLeader(String collection, String shardId, CoreDescriptor leaderCd,
String znodePath, byte[] znodeData,
boolean retryOnConnLoss) throws KeeperException, InterruptedException {
if (!leaderCd.getCloudDescriptor().isLeader()) {
log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
}
ContextKey key = new ContextKey(collection, leaderCd.getCloudDescriptor().getCoreNodeName());
ElectionContext context = electionContexts.get(key);
// we make sure we locally think we are the leader before and after getting the context - then
// we only try zk if we still think we are the leader and have our leader context
if (context == null || !leaderCd.getCloudDescriptor().isLeader()) {
log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
}
// we think we are the leader - get the expected shard leader version
// we use this version and multi to ensure *only* the current zk registered leader
// for a shard can put a replica into LIR
Integer leaderZkNodeParentVersion = ((ShardLeaderElectionContextBase)context).getLeaderZkNodeParentVersion();
// TODO: should we do this optimistically to avoid races?
if (zkClient.exists(znodePath, retryOnConnLoss)) {
List<Op> ops = new ArrayList<>(2);
@ -2306,7 +2342,7 @@ public class ZkController {
} catch (KeeperException.NodeExistsException nee) {
// if it exists, that's great!
}
// we only create the entry if the context we are using is registered as the current leader in ZK
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
@ -2316,11 +2352,13 @@ public class ZkController {
}
}
public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
@Deprecated
public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
return "/collections/" + collection + "/leader_initiated_recovery/" + shardId;
}
public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
@Deprecated
public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
return getLeaderInitiatedRecoveryZnodePath(collection, shardId) + "/" + coreNodeName;
}
@ -2608,12 +2646,6 @@ public class ZkController {
};
}
public String getLeaderSeqPath(String collection, String coreNodeName) {
ContextKey key = new ContextKey(collection, coreNodeName);
ElectionContext context = electionContexts.get(key);
return context != null ? context.leaderSeqPath : null;
}
/**
* Thrown during leader initiated recovery process if current node is not leader
*/

View File

@ -0,0 +1,475 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class used for interact with a ZK term node.
* Each ZK term node relates to a shard of a collection and have this format (in json)
* <p>
* <code>
* {
* "replicaNodeName1" : 1,
* "replicaNodeName2" : 2,
* ..
* }
* </code>
* <p>
* The values correspond to replicas are called terms.
* Only replicas with highest term value are considered up to date and be able to become leader and serve queries.
* <p>
* Terms can only updated in two strict ways:
* <ul>
* <li>A replica sets its term equals to leader's term
* <li>The leader increase its term and some other replicas by 1
* </ul>
* This class should not be reused after {@link org.apache.zookeeper.Watcher.Event.KeeperState#Expired} event
*/
public class ZkShardTerms implements AutoCloseable{
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Object writingLock = new Object();
private final String collection;
private final String shard;
private final String znodePath;
private final SolrZkClient zkClient;
private final Set<CoreTermWatcher> listeners = new HashSet<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private Terms terms;
// Listener of a core for shard's term change events
interface CoreTermWatcher {
// return true if the listener wanna to be triggered in the next time
boolean onTermChanged(Terms terms);
}
public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
this.znodePath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms/" + shard;
this.collection = collection;
this.shard = shard;
this.zkClient = zkClient;
ensureTermNodeExist();
refreshTerms();
retryRegisterWatcher();
ObjectReleaseTracker.track(this);
}
/**
* Ensure that leader's term is higher than some replica's terms
* @param leader coreNodeName of leader
* @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
*/
public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
Terms newTerms;
while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) {
if (forceSaveTerms(newTerms)) return;
}
}
/**
* Can this replica become leader or is this replica's term equals to leader's term?
* @param coreNodeName of the replica
* @return true if this replica can become leader, false if otherwise
*/
public boolean canBecomeLeader(String coreNodeName) {
return terms.canBecomeLeader(coreNodeName);
}
/**
* Did this replica registered its term? This is a sign to check f
* @param coreNodeName of the replica
* @return true if this replica registered its term, false if otherwise
*/
public boolean registered(String coreNodeName) {
return terms.getTerm(coreNodeName) != null;
}
public void close() {
// no watcher will be registered
isClosed.set(true);
synchronized (listeners) {
listeners.clear();
}
ObjectReleaseTracker.release(this);
}
// package private for testing, only used by tests
Map<String, Long> getTerms() {
synchronized (writingLock) {
return new HashMap<>(terms.values);
}
}
/**
* Add a listener so the next time the shard's term get updated, listeners will be called
*/
void addListener(CoreTermWatcher listener) {
synchronized (listeners) {
listeners.add(listener);
}
}
/**
* Remove the coreNodeName from terms map and also remove any expired listeners
* @return Return true if this object should not be reused
*/
boolean removeTerm(CoreDescriptor cd) {
int numListeners;
synchronized (listeners) {
// solrcore already closed
listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms));
numListeners = listeners.size();
}
Terms newTerms;
while ( (newTerms = terms.removeTerm(cd.getCloudDescriptor().getCoreNodeName())) != null) {
try {
if (saveTerms(newTerms)) return numListeners == 0;
} catch (KeeperException.NoNodeException e) {
return true;
}
}
return true;
}
/**
* Register a replica's term (term value will be 0).
* If a term is already associate with this replica do nothing
* @param coreNodeName of the replica
*/
void registerTerm(String coreNodeName) {
Terms newTerms;
while ( (newTerms = terms.registerTerm(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
/**
* Set a replica's term equals to leader's term
* @param coreNodeName of the replica
*/
public void setEqualsToMax(String coreNodeName) {
Terms newTerms;
while ( (newTerms = terms.setEqualsToMax(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
public long getTerm(String coreNodeName) {
Long term = terms.getTerm(coreNodeName);
return term == null? -1 : term;
}
// package private for testing, only used by tests
int getNumListeners() {
synchronized (listeners) {
return listeners.size();
}
}
/**
* Set new terms to ZK.
* In case of correspond ZK term node is not created, create it
* @param newTerms to be set
* @return true if terms is saved successfully to ZK, false if otherwise
*/
private boolean forceSaveTerms(Terms newTerms) {
try {
return saveTerms(newTerms);
} catch (KeeperException.NoNodeException e) {
ensureTermNodeExist();
return false;
}
}
/**
* Set new terms to ZK, the version of new terms must match the current ZK term node
* @param newTerms to be set
* @return true if terms is saved successfully to ZK, false if otherwise
* @throws KeeperException.NoNodeException correspond ZK term node is not created
*/
private boolean saveTerms(Terms newTerms) throws KeeperException.NoNodeException {
byte[] znodeData = Utils.toJSON(newTerms.values);
try {
Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
setNewTerms(new Terms(newTerms.values, stat.getVersion()));
return true;
} catch (KeeperException.BadVersionException e) {
log.info("Failed to save terms, version is not match, retrying");
refreshTerms();
} catch (KeeperException.NoNodeException e) {
throw e;
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error save shard term for collection:" + collection, e);
}
return false;
}
/**
* Create correspond ZK term node
*/
private void ensureTermNodeExist() {
String path = "/collections/"+collection+ "/terms";
try {
if (!zkClient.exists(path, true)) {
try {
zkClient.makePath(path, true);
} catch (KeeperException.NodeExistsException e) {
// it's okay if another beats us creating the node
}
}
path += "/"+shard;
if (!zkClient.exists(path, true)) {
try {
Map<String, Long> initialTerms = new HashMap<>();
zkClient.create(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException e) {
// it's okay if another beats us creating the node
}
}
} catch (InterruptedException e) {
Thread.interrupted();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
} catch (KeeperException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
}
}
/**
* Fetch latest terms from ZK
*/
public void refreshTerms() {
Terms newTerms;
try {
Stat stat = new Stat();
byte[] data = zkClient.getData(znodePath, null, stat, true);
newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
} catch (KeeperException e) {
Thread.interrupted();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
}
setNewTerms(newTerms);
}
/**
* Retry register a watcher to the correspond ZK term node
*/
private void retryRegisterWatcher() {
while (!isClosed.get()) {
try {
registerWatcher();
return;
} catch (KeeperException.SessionExpiredException | KeeperException.AuthFailedException e) {
isClosed.set(true);
log.error("Failed watching shard term for collection: {} due to unrecoverable exception", collection, e);
return;
} catch (KeeperException e) {
log.warn("Failed watching shard term for collection:{}, retrying!", collection, e);
try {
zkClient.getConnectionManager().waitForConnected(zkClient.getZkClientTimeout());
} catch (TimeoutException te) {
if (Thread.interrupted()) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection:" + collection, te);
}
}
}
}
}
/**
* Register a watcher to the correspond ZK term node
*/
private void registerWatcher() throws KeeperException {
Watcher watcher = event -> {
// session events are not change events, and do not remove the watcher
if (Watcher.Event.EventType.None == event.getType()) {
return;
}
retryRegisterWatcher();
// Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
refreshTerms();
};
try {
// exists operation is faster than getData operation
zkClient.exists(znodePath, watcher, true);
} catch (InterruptedException e) {
Thread.interrupted();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection:" + collection, e);
}
}
/**
* Atomically update {@link ZkShardTerms#terms} and call listeners
* @param newTerms to be set
*/
private void setNewTerms(Terms newTerms) {
boolean isChanged = false;
synchronized (writingLock) {
if (terms == null || newTerms.version > terms.version) {
terms = newTerms;
isChanged = true;
}
}
if (isChanged) onTermUpdates(newTerms);
}
private void onTermUpdates(Terms newTerms) {
synchronized (listeners) {
listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms));
}
}
/**
* Hold values of terms, this class is immutable
*/
static class Terms {
private final Map<String, Long> values;
// ZK node version
private final int version;
public Terms () {
this(new HashMap<>(), 0);
}
public Terms(Map<String, Long> values, int version) {
this.values = values;
this.version = version;
}
/**
* Can this replica become leader or is this replica's term equals to leader's term?
* @param coreNodeName of the replica
* @return true if this replica can become leader, false if otherwise
*/
boolean canBecomeLeader(String coreNodeName) {
if (values.isEmpty()) return true;
long maxTerm = Collections.max(values.values());
return values.getOrDefault(coreNodeName, 0L) == maxTerm;
}
Long getTerm(String coreNodeName) {
return values.get(coreNodeName);
}
/**
* Return a new {@link Terms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery}
* @param leader coreNodeName of leader
* @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
* @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery}
*/
Terms increaseTerms(String leader, Set<String> replicasNeedingRecovery) {
if (!values.containsKey(leader)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
}
boolean changed = false;
boolean foundReplicasInLowerTerms = false;
HashMap<String, Long> newValues = new HashMap<>(values);
long leaderTerm = newValues.get(leader);
for (String replica : newValues.keySet()) {
if (replicasNeedingRecovery.contains(replica)) foundReplicasInLowerTerms = true;
if (Objects.equals(newValues.get(replica), leaderTerm)) {
if(replicasNeedingRecovery.contains(replica)) {
changed = true;
} else {
newValues.put(replica, leaderTerm+1);
}
}
}
// We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
// this may indicate that the current value is stale
if (!changed && foundReplicasInLowerTerms) return null;
return new Terms(newValues, version);
}
/**
* Return a new {@link Terms} in which term of {@code coreNodeName} is removed
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already not exist
*/
Terms removeTerm(String coreNodeName) {
if (!values.containsKey(coreNodeName)) return null;
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.remove(coreNodeName);
return new Terms(newValues, version);
}
/**
* Return a new {@link Terms} in which the associate term of {@code coreNodeName} is not null
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already exist
*/
Terms registerTerm(String coreNodeName) {
if (values.containsKey(coreNodeName)) return null;
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, 0L);
return new Terms(newValues, version);
}
/**
* Return a new {@link Terms} in which the term of {@code coreNodeName} is max
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already maximum
*/
Terms setEqualsToMax(String coreNodeName) {
long maxTerm;
try {
maxTerm = Collections.max(values.values());
} catch (NoSuchElementException e){
maxTerm = 0;
}
if (values.get(coreNodeName) == maxTerm) return null;
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, maxTerm);
return new Terms(newValues, version);
}
}
}

View File

@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
@ -392,7 +393,22 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
public static void createCollectionZkNode(DistribStateManager stateManager, String collection, Map<String,String> params) {
log.debug("Check for collection zkNode:" + collection);
String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
// clean up old terms node
String termsPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms";
try {
if (stateManager.hasData(termsPath)) {
List<String> paths = stateManager.listData(termsPath);
for (String path : paths) {
stateManager.removeData(termsPath + "/" + path, -1);
}
stateManager.removeData(termsPath, -1);
}
} catch (InterruptedException e) {
Thread.interrupted();
throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
} catch (KeeperException | IOException | BadVersionException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
}
try {
if (!stateManager.hasData(collectionPath)) {
log.debug("Creating collection in ZooKeeper:" + collection);

View File

@ -28,8 +28,10 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.function.BiConsumer;
import com.google.common.collect.ImmutableSet;
@ -47,6 +49,7 @@ import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.rule.ReplicaAssigner;
@ -1067,7 +1070,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
private static void forceLeaderElection(SolrQueryRequest req, CollectionsHandler handler) {
ClusterState clusterState = handler.coreContainer.getZkController().getClusterState();
ZkController zkController = handler.coreContainer.getZkController();
ClusterState clusterState = zkController.getClusterState();
String collectionName = req.getParams().required().get(COLLECTION_PROP);
String sliceId = req.getParams().required().get(SHARD_ID_PROP);
@ -1079,7 +1083,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
"No shard with name " + sliceId + " exists for collection " + collectionName);
}
try {
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, slice.getName(), zkController.getZkClient())) {
// if an active replica is the leader, then all is fine already
Replica leader = slice.getLeader();
if (leader != null && leader.getState() == State.ACTIVE) {
@ -1096,20 +1100,37 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
handler.coreContainer.getZkController().getZkClient().clean(lirPath);
}
final Set<String> liveNodes = clusterState.getLiveNodes();
List<Replica> liveReplicas = slice.getReplicas().stream()
.filter(rep -> liveNodes.contains(rep.getNodeName())).collect(Collectors.toList());
boolean shouldIncreaseReplicaTerms = liveReplicas.stream()
.noneMatch(rep -> zkShardTerms.registered(rep.getName()) && zkShardTerms.canBecomeLeader(rep.getName()));
// we won't increase replica's terms if exist a live replica with term equals to leader
if (shouldIncreaseReplicaTerms) {
OptionalLong optionalMaxTerm = liveReplicas.stream()
.filter(rep -> zkShardTerms.registered(rep.getName()))
.mapToLong(rep -> zkShardTerms.getTerm(rep.getName()))
.max();
// increase terms of replicas less out-of-sync
if (optionalMaxTerm.isPresent()) {
liveReplicas.stream()
.filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong())
.forEach(rep -> zkShardTerms.setEqualsToMax(rep.getName()));
}
}
// Call all live replicas to prepare themselves for leadership, e.g. set last published
// state to active.
for (Replica rep : slice.getReplicas()) {
if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
for (Replica rep : liveReplicas) {
ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
params.set(CoreAdminParams.CORE, rep.getStr("core"));
String nodeName = rep.getNodeName();
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
params.set(CoreAdminParams.CORE, rep.getStr("core"));
String nodeName = rep.getNodeName();
OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
}
OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
}
// Wait till we have an active leader

View File

@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
import java.util.Objects;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@ -124,6 +125,12 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
log.warn("Leader " + core.getName() + " ignoring request to be in the recovering state because it is live and active.");
}
ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
// if the replica is waiting for leader to see recovery state, the leader should refresh its terms
if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && !shardTerms.canBecomeLeader(coreNodeName)) {
shardTerms.refreshTerms();
}
boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && localState != Replica.State.ACTIVE;
log.info("In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
", thisCore=" + core.getName() + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +

View File

@ -308,19 +308,20 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
// after the current one, and if there is, bail
boolean locked = recoveryLock.tryLock();
try {
if (!locked) {
if (recoveryWaiting.get() > 0) {
return;
}
recoveryWaiting.incrementAndGet();
} else {
recoveryWaiting.incrementAndGet();
cancelRecovery();
if (!locked && recoveryWaiting.get() > 0) {
return;
}
recoveryWaiting.incrementAndGet();
cancelRecovery();
recoveryLock.lock();
try {
recoveryWaiting.decrementAndGet();
// don't use recoveryLock.getQueueLength() for this
if (recoveryWaiting.decrementAndGet() > 0) {
// another recovery waiting behind us, let it run now instead of after we finish
return;
}
// to be air tight we must also check after lock
if (cc.isShutDown()) {

View File

@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -44,6 +45,7 @@ import org.apache.solr.client.solrj.response.SimpleSolrResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@ -184,6 +186,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private final boolean cloneRequiredOnLeader;
private final Replica.Type replicaType;
@Deprecated
// this flag, used for testing rolling updates, should be removed by SOLR-11812
private final boolean isOldLIRMode;
public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
}
@ -202,6 +208,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
this.ulog = req.getCore().getUpdateHandler().getUpdateLog();
this.vinfo = ulog == null ? null : ulog.getVersionInfo();
this.isOldLIRMode = !"new".equals(req.getCore().getCoreDescriptor().getCoreProperty("lirVersion", "new"));
versionsStored = this.vinfo != null && this.vinfo.getVersionField() != null;
returnVersions = req.getParams().getBool(UpdateParams.VERSIONS ,false);
@ -343,13 +350,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
List<Node> nodes = new ArrayList<>(replicaProps.size());
ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
for (ZkCoreNodeProps props : replicaProps) {
if (skipList != null) {
boolean skip = skipListSet.contains(props.getCoreUrl());
log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:" + skip);
if (!skip) {
nodes.add(new StdNode(props, collection, shardId));
}
String coreNodeName = ((Replica) props.getNodeProps()).getName();
if (skipList != null && skipListSet.contains(props.getCoreUrl())) {
log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:true");
} else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
log.info("skip url:{} cause its term is less than leader", props.getCoreUrl());
} else {
nodes.add(new StdNode(props, collection, shardId));
}
@ -751,7 +758,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// TODO - we may need to tell about more than one error...
List<Error> errorsForClient = new ArrayList<>(errors.size());
Map<ShardInfo, Set<String>> failedReplicas = new HashMap<>();
for (final SolrCmdDistributor.Error error : errors) {
if (error.req.node instanceof RetryNode) {
@ -843,18 +850,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
&& foundErrorNodeInReplicaList // we found an error for one of replicas
&& !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR
try {
String coreNodeName = ((Replica) stdNode.getNodeProps().getNodeProps()).getName();
// if false, then the node is probably not "live" anymore
// and we do not need to send a recovery message
Throwable rootCause = SolrException.getRootCause(error.e);
log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
zkController.ensureReplicaInLeaderInitiatedRecovery(
req.getCore().getCoreContainer(),
collection,
shardId,
stdNode.getNodeProps(),
req.getCore().getCoreDescriptor(),
false /* forcePublishState */
);
if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
ShardInfo shardInfo = new ShardInfo(collection, shardId, leaderCoreNodeName);
failedReplicas.putIfAbsent(shardInfo, new HashSet<>());
failedReplicas.get(shardInfo).add(coreNodeName);
} else {
// The replica did not registered its term, so it must run with old LIR implementation
log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
zkController.ensureReplicaInLeaderInitiatedRecovery(
req.getCore().getCoreContainer(),
collection,
shardId,
stdNode.getNodeProps(),
req.getCore().getCoreDescriptor(),
false /* forcePublishState */
);
}
} catch (Exception exc) {
Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
log.error("Leader failed to set replica " +
@ -873,6 +889,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
}
if (!isOldLIRMode) {
for (Map.Entry<ShardInfo, Set<String>> entry : failedReplicas.entrySet()) {
ShardInfo shardInfo = entry.getKey();
zkController.getShardTerms(shardInfo.collection, shardInfo.shard).ensureTermsIsHigher(shardInfo.leader, entry.getValue());
}
}
// in either case, we need to attach the achieved and min rf to the response.
if (leaderReplicationTracker != null || rollupReplicationTracker != null) {
int achievedRf = Integer.MAX_VALUE;
@ -905,6 +927,38 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
private class ShardInfo {
private String collection;
private String shard;
private String leader;
public ShardInfo(String collection, String shard, String leader) {
this.collection = collection;
this.shard = shard;
this.leader = leader;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardInfo shardInfo = (ShardInfo) o;
if (!collection.equals(shardInfo.collection)) return false;
if (!shard.equals(shardInfo.shard)) return false;
return leader.equals(shardInfo.leader);
}
@Override
public int hashCode() {
int result = collection.hashCode();
result = 31 * result + shard.hashCode();
result = 31 * result + leader.hashCode();
return result;
}
}
// must be synchronized by bucket
private void doLocalAdd(AddUpdateCommand cmd) throws IOException {

View File

@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
@ -70,18 +71,166 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
/**
* Tests that FORCELEADER can get an active leader even only replicas with term lower than leader's term are live
*/
@Test
@Slow
public void testReplicasInLowerTerms() throws Exception {
handle.put("maxScore", SKIPVAL);
handle.put("timestamp", SKIPVAL);
String testCollectionName = "forceleader_lower_terms_collection";
createCollection(testCollectionName, "conf1", 1, 3, 1);
cloudClient.setDefaultCollection(testCollectionName);
try {
List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
assertEquals("Expected 2 replicas for collection " + testCollectionName
+ " but found " + notLeaders.size() + "; clusterState: "
+ printClusterStateInfo(testCollectionName), 2, notLeaders.size());
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
JettySolrRunner notLeader0 = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
ZkController zkController = notLeader0.getCoreContainer().getZkController();
log.info("Before put non leaders into lower term: " + printClusterStateInfo());
putNonLeadersIntoLowerTerm(testCollectionName, SHARD1, zkController, leader, notLeaders);
for (Replica replica : notLeaders) {
waitForState(testCollectionName, replica.getName(), State.DOWN, 60000);
}
waitForState(testCollectionName, leader.getName(), State.DOWN, 60000);
cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
assertEquals("Expected only 0 active replica but found " + numActiveReplicas +
"; clusterState: " + printClusterStateInfo(), 0, numActiveReplicas);
int numReplicasOnLiveNodes = 0;
for (Replica rep : clusterState.getCollection(testCollectionName).getSlice(SHARD1).getReplicas()) {
if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
numReplicasOnLiveNodes++;
}
}
assertEquals(2, numReplicasOnLiveNodes);
log.info("Before forcing leader: " + printClusterStateInfo());
// Assert there is no leader yet
assertNull("Expected no leader right now. State: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1),
clusterState.getCollection(testCollectionName).getSlice(SHARD1).getLeader());
assertSendDocFails(3);
log.info("Do force leader...");
doForceLeader(cloudClient, testCollectionName, SHARD1);
// By now we have an active leader. Wait for recoveries to begin
waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
clusterState = cloudClient.getZkStateReader().getClusterState();
log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1));
// we have a leader
Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
assertNotNull(newLeader);
// leader is active
assertEquals(State.ACTIVE, newLeader.getState());
numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
assertEquals(2, numActiveReplicas);
// Assert that indexing works again
log.info("Sending doc 4...");
sendDoc(4);
log.info("Committing...");
cloudClient.commit();
log.info("Doc 4 sent and commit issued");
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 4, 4);
// Docs 1 and 4 should be here. 2 was lost during the partition, 3 had failed to be indexed.
log.info("Checking doc counts...");
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("q", "*:*");
assertEquals("Expected only 2 documents in the index", 2, cloudClient.query(params).getResults().getNumFound());
bringBackOldLeaderAndSendDoc(testCollectionName, leader, notLeaders, 5);
} finally {
log.info("Cleaning up after the test.");
// try to clean up
attemptCollectionDelete(cloudClient, testCollectionName);
}
}
void putNonLeadersIntoLowerTerm(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception {
SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()];
for (int i = 0; i < notLeaders.size(); i++)
nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i));
sendDoc(1);
// ok, now introduce a network partition between the leader and both replicas
log.info("Closing proxies for the non-leader replicas...");
for (SocketProxy proxy : nonLeaderProxies)
proxy.close();
getProxyForReplica(leader).close();
// indexing during a partition
log.info("Sending a doc during the network partition...");
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
sendDoc(2, null, leaderJetty);
for (Replica replica : notLeaders) {
waitForState(collectionName, replica.getName(), State.DOWN, 60000);
}
// Kill the leader
log.info("Killing leader for shard1 of " + collectionName + " on node " + leader.getNodeName() + "");
leaderJetty.stop();
// Wait for a steady state, till the shard is leaderless
log.info("Sleep and periodically wake up to check for state...");
for (int i = 0; i < 20; i++) {
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
boolean allDown = true;
for (Replica replica : clusterState.getCollection(collectionName).getSlice(shard).getReplicas()) {
if (replica.getState() != State.DOWN) {
allDown = false;
}
}
if (allDown && clusterState.getCollection(collectionName).getSlice(shard).getLeader() == null) {
break;
}
Thread.sleep(1000);
}
log.info("Waking up...");
// remove the network partition
log.info("Reopening the proxies for the non-leader replicas...");
for (SocketProxy proxy : nonLeaderProxies)
proxy.reopen();
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, shard, cloudClient.getZkStateReader().getZkClient())) {
for (Replica notLeader : notLeaders) {
assertTrue(zkShardTerms.getTerm(leader.getName()) > zkShardTerms.getTerm(notLeader.getName()));
}
}
}
/***
* Tests that FORCELEADER can get an active leader after leader puts all replicas in LIR and itself goes down,
* hence resulting in a leaderless shard.
*/
@Test
@Slow
//TODO remove in SOLR-11812
public void testReplicasInLIRNoLeader() throws Exception {
handle.put("maxScore", SKIPVAL);
handle.put("timestamp", SKIPVAL);
String testCollectionName = "forceleader_test_collection";
createCollection(testCollectionName, "conf1", 1, 3, 1);
createOldLirCollection(testCollectionName, 3);
cloudClient.setDefaultCollection(testCollectionName);
try {
@ -157,6 +306,28 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
}
private void createOldLirCollection(String collection, int numReplicas) throws IOException, SolrServerException {
if (onlyLeaderIndexes) {
CollectionAdminRequest
.createCollection(collection, "conf1", 1, 0, numReplicas, 0)
.setCreateNodeSet("")
.process(cloudClient);
} else {
CollectionAdminRequest.createCollection(collection, "conf1", 1, numReplicas)
.setCreateNodeSet("")
.process(cloudClient);
}
Properties oldLir = new Properties();
oldLir.setProperty("lirVersion", "old");
for (int i = 0; i < numReplicas; i++) {
// this is the only way to create replicas which run in old lir implementation
CollectionAdminRequest
.addReplicaToShard(collection, "shard1", onlyLeaderIndexes? Replica.Type.TLOG: Replica.Type.NRT)
.setProperties(oldLir)
.process(cloudClient);
}
}
/**
* Test that FORCELEADER can set last published state of all down (live) replicas to active (so
* that they become worthy candidates for leader election).
@ -167,7 +338,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
handle.put("timestamp", SKIPVAL);
String testCollectionName = "forceleader_last_published";
createCollection(testCollectionName, "conf1", 1, 3, 1);
createOldLirCollection(testCollectionName, 3);
cloudClient.setDefaultCollection(testCollectionName);
log.info("Collection created: " + testCollectionName);
@ -204,33 +375,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
}
protected void unsetLeader(String collection, String slice) throws Exception {
ZkDistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, slice,
ZkStateReader.COLLECTION_PROP, collection);
inQueue.offer(Utils.toJSON(m));
ClusterState clusterState = null;
boolean transition = false;
for (int counter = 10; counter > 0; counter--) {
clusterState = zkStateReader.getClusterState();
Replica newLeader = clusterState.getCollection(collection).getSlice(slice).getLeader();
if (newLeader == null) {
transition = true;
break;
}
Thread.sleep(1000);
}
if (!transition) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not unset replica leader" +
". Cluster state: " + printClusterStateInfo(collection));
}
}
protected void setReplicaState(String collection, String slice, Replica replica, Replica.State state) throws Exception {
DistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
@ -263,23 +407,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
". Last known state of the replica: " + replicaState);
}
}
/*protected void setLastPublishedState(String collection, String slice, Replica replica, Replica.State state) throws SolrServerException, IOException,
KeeperException, InterruptedException {
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
String baseUrl = zkStateReader.getBaseUrlForNodeName(replica.getNodeName());
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
params.set(CoreAdminParams.CORE, replica.getStr("core"));
params.set(ZkStateReader.STATE_PROP, state.toString());
SolrRequest<SimpleSolrResponse> req = new GenericSolrRequest(METHOD.GET, "/admin/cores", params);
NamedList resp = null;
try (HttpSolrClient hsc = new HttpSolrClient(baseUrl)) {
resp = hsc.request(req);
}
}*/
protected Replica.State getLastPublishedState(String collection, String slice, Replica replica) throws SolrServerException, IOException,
KeeperException, InterruptedException {
@ -377,6 +504,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
// Bring back the leader which was stopped
log.info("Bringing back originally killed leader...");
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
getProxyForReplica(leader).reopen();
leaderJetty.start();
waitForRecoveriesToFinish(collection, cloudClient.getZkStateReader(), true);
cloudClient.getZkStateReader().forceUpdateCollection(collection);

View File

@ -30,7 +30,7 @@ import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.List;
public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest {
public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -38,7 +38,7 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
private final boolean onlyLeaderIndexes = random().nextBoolean();
public LeaderInitiatedRecoveryOnCommitTest() {
public HttpPartitionOnCommitTest() {
super();
sliceCount = 1;
fixShardCount(4);

View File

@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@ -49,15 +50,21 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.Replica.State.DOWN;
import static org.apache.solr.common.cloud.Replica.State.RECOVERING;
/**
* Simulates HTTP partitions between a leader and replica but the replica does
* not lose its ZooKeeper connection.
@ -121,6 +128,8 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
testLeaderInitiatedRecoveryCRUD();
testDoRecoveryOnRestart();
// Tests that if we set a minRf that's not satisfied, no recovery is requested, but if minRf is satisfied,
// recovery is requested
testMinRf();
@ -147,8 +156,9 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
}
/**
* Tests handling of lir state znodes.
* Tests handling of different format of lir nodes
*/
//TODO remove in SOLR-11812
protected void testLeaderInitiatedRecoveryCRUD() throws Exception {
String testCollectionName = "c8n_crud_1x2";
String shardId = "shard1";
@ -182,11 +192,11 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
};
}
};
zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, cd, true);
zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), DOWN, cd, true);
Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap);
assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
assertSame(DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
// test old non-json format handling
SolrZkClient zkClient = zkController.getZkClient();
@ -194,13 +204,64 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
zkClient.setData(znodePath, "down".getBytes(StandardCharsets.UTF_8), true);
lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap);
assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
assertSame(DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
zkClient.delete(znodePath, -1, false);
// try to clean up
attemptCollectionDelete(cloudClient, testCollectionName);
}
private void testDoRecoveryOnRestart() throws Exception {
String testCollectionName = "collDoRecoveryOnRestart";
try {
// Inject pausing in recovery op, hence the replica won't be able to finish recovery
System.setProperty("solr.cloud.wait-for-updates-with-stale-state-pause", String.valueOf(Integer.MAX_VALUE));
createCollection(testCollectionName, "conf1", 1, 2, 1);
cloudClient.setDefaultCollection(testCollectionName);
sendDoc(1, 2);
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
proxy0.close();
leaderProxy.close();
// indexing during a partition
int achievedRf = sendDoc(2, 1, leaderJetty);
assertEquals("Unexpected achieved replication factor", 1, achievedRf);
try (ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, "shard1", cloudClient.getZkStateReader().getZkClient())) {
assertFalse(zkShardTerms.canBecomeLeader(notLeaders.get(0).getName()));
}
waitForState(testCollectionName, notLeaders.get(0).getName(), DOWN, 10000);
// heal partition
proxy0.reopen();
leaderProxy.reopen();
waitForState(testCollectionName, notLeaders.get(0).getName(), RECOVERING, 10000);
System.clearProperty("solr.cloud.wait-for-updates-with-stale-state-pause");
JettySolrRunner notLeaderJetty = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
ChaosMonkey.stop(notLeaderJetty);
ChaosMonkey.start(notLeaderJetty);
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, 100);
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 2);
} finally {
System.clearProperty("solr.cloud.wait-for-updates-with-stale-state-pause");
}
// try to clean up
attemptCollectionDelete(cloudClient, testCollectionName);
}
protected void testMinRf() throws Exception {
// create a collection that has 1 shard and 3 replicas
String testCollectionName = "collMinRf_1x3";
@ -209,6 +270,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
sendDoc(1, 2);
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
assertTrue("Expected 2 non-leader replicas for collection " + testCollectionName
@ -221,27 +283,21 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
// Now introduce a network partition between the leader and 1 replica, so a minRf of 2 is still achieved
log.info("partitioning replica : " + notLeaders.get(0));
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
proxy0.close();
// leader still can connect to replica 2, by closing leaderProxy, replica 1 can not do recovery
leaderProxy.close();
// indexing during a partition
int achievedRf = sendDoc(2, 2);
int achievedRf = sendDoc(2, 2, leaderJetty);
assertEquals("Unexpected achieved replication factor", 2, achievedRf);
try (ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, "shard1", cloudClient.getZkStateReader().getZkClient())) {
assertFalse(zkShardTerms.canBecomeLeader(notLeaders.get(0).getName()));
}
Thread.sleep(sleepMsBeforeHealPartition);
// Verify that the partitioned replica is DOWN
ZkStateReader zkr = cloudClient.getZkStateReader();
zkr.forceUpdateCollection(testCollectionName);; // force the state to be fresh
ClusterState cs = zkr.getClusterState();
Collection<Slice> slices = cs.getCollection(testCollectionName).getActiveSlices();
Slice slice = slices.iterator().next();
Replica partitionedReplica = slice.getReplica(notLeaders.get(0).getName());
assertEquals("The partitioned replica did not get marked down",
Replica.State.DOWN.toString(), partitionedReplica.getStr(ZkStateReader.STATE_PROP));
log.info("un-partitioning replica : " + notLeaders.get(0));
proxy0.reopen();
leaderProxy.reopen();
notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@ -254,8 +310,10 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
proxy0.close();
SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
proxy1.close();
leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
leaderProxy.close();
achievedRf = sendDoc(3, 2);
achievedRf = sendDoc(3, 2, leaderJetty);
assertEquals("Unexpected achieved replication factor", 1, achievedRf);
Thread.sleep(sleepMsBeforeHealPartition);
@ -265,6 +323,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
proxy0.reopen();
proxy1.reopen();
leaderProxy.reopen();
notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@ -299,30 +358,32 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
Replica notLeader =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive).get(0);
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy = getProxyForReplica(notLeader);
SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
proxy.close();
leaderProxy.close();
// indexing during a partition
sendDoc(2);
// Have the partition last at least 1 sec
// While this gives the impression that recovery is timing related, this is
// really only
// to give time for the state to be written to ZK before the test completes.
// In other words,
// without a brief pause, the test finishes so quickly that it doesn't give
// time for the recovery process to kick-in
Thread.sleep(sleepMsBeforeHealPartition);
sendDoc(2, null, leaderJetty);
// replica should publish itself as DOWN if the network is not healed after some amount time
waitForState(testCollectionName, notLeader.getName(), DOWN, 10000);
proxy.reopen();
leaderProxy.reopen();
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
sendDoc(3);
int achievedRf = sendDoc(3);
if (achievedRf == 1) {
// this case can happen when leader reuse an connection get established before network partition
// TODO: Remove when SOLR-11776 get committed
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
}
// sent 3 docs in so far, verify they are on the leader and replica
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 3);
@ -349,21 +410,25 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
if (d % 10 == 0) {
if (hasPartition) {
proxy.reopen();
leaderProxy.reopen();
hasPartition = false;
} else {
if (d >= 10) {
proxy.close();
leaderProxy.close();
hasPartition = true;
Thread.sleep(sleepMsBeforeHealPartition);
}
}
}
sendDoc(d + 4); // 4 is offset as we've already indexed 1-3
// always send doc directly to leader without going through proxy
sendDoc(d + 4, null, leaderJetty); // 4 is offset as we've already indexed 1-3
}
// restore connectivity if lost
if (hasPartition) {
proxy.reopen();
leaderProxy.reopen();
}
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
@ -384,7 +449,24 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
// try to clean up
attemptCollectionDelete(cloudClient, testCollectionName);
}
protected void waitForState(String collection, String replicaName, Replica.State state, long ms) throws KeeperException, InterruptedException {
TimeOut timeOut = new TimeOut(ms, TimeUnit.MILLISECONDS, TimeSource.CURRENT_TIME);
Replica.State replicaState = Replica.State.ACTIVE;
while (!timeOut.hasTimedOut()) {
ZkStateReader zkr = cloudClient.getZkStateReader();
zkr.forceUpdateCollection(collection);; // force the state to be fresh
ClusterState cs = zkr.getClusterState();
Collection<Slice> slices = cs.getCollection(collection).getActiveSlices();
Slice slice = slices.iterator().next();
Replica partitionedReplica = slice.getReplica(replicaName);
replicaState = partitionedReplica.getState();
if (replicaState == state) return;
}
assertEquals("Timeout waiting for state "+ state +" of replica " + replicaName + ", current state " + replicaState,
state, replicaState);
}
protected void testRf3() throws Exception {
// create a collection that has 1 shard but 2 replicas
String testCollectionName = "c8n_1x3";
@ -400,27 +482,30 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
+ " but found " + notLeaders.size() + "; clusterState: "
+ printClusterStateInfo(testCollectionName),
notLeaders.size() == 2);
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
proxy0.close();
leaderProxy.close();
// indexing during a partition
sendDoc(2);
sendDoc(2, null, leaderJetty);
Thread.sleep(sleepMsBeforeHealPartition);
proxy0.reopen();
SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
proxy1.close();
sendDoc(3);
sendDoc(3, null, leaderJetty);
Thread.sleep(sleepMsBeforeHealPartition);
proxy1.reopen();
leaderProxy.reopen();
// sent 4 docs in so far, verify they are on the leader and replica
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@ -578,8 +663,19 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
protected int sendDoc(int docId) throws Exception {
return sendDoc(docId, null);
}
// Send doc directly to a server (without going through proxy)
protected int sendDoc(int docId, Integer minRf, JettySolrRunner leaderJetty) throws IOException, SolrServerException {
try (HttpSolrClient solrClient = new HttpSolrClient.Builder(leaderJetty.getBaseUrl().toString()).build()) {
return sendDoc(docId, minRf, solrClient, cloudClient.getDefaultCollection());
}
}
protected int sendDoc(int docId, Integer minRf) throws Exception {
return sendDoc(docId, minRf, cloudClient, cloudClient.getDefaultCollection());
}
protected int sendDoc(int docId, Integer minRf, SolrClient solrClient, String collection) throws IOException, SolrServerException {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(id, String.valueOf(docId));
doc.addField("a_t", "hello" + docId);
@ -589,8 +685,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
}
up.add(doc);
return cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
return cloudClient.getMinAchievedReplicationFactor(collection, solrClient.request(up, collection));
}
/**

View File

@ -0,0 +1,457 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LIRRollingUpdatesTest extends SolrCloudTestCase {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static Map<URI, SocketProxy> proxies;
private static Map<URI, JettySolrRunner> jettys;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(3)
.addConfig("conf", configset("cloud-minimal"))
.configure();
// Add proxies
proxies = new HashMap<>(cluster.getJettySolrRunners().size());
jettys = new HashMap<>(cluster.getJettySolrRunners().size());
for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
SocketProxy proxy = new SocketProxy();
jetty.setProxyPort(proxy.getListenPort());
cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart
cluster.startJettySolrRunner(jetty);
proxy.open(jetty.getBaseUrl().toURI());
LOG.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl());
proxies.put(proxy.getUrl(), proxy);
jettys.put(proxy.getUrl(), jetty);
}
}
@AfterClass
public static void tearDownCluster() throws Exception {
for (SocketProxy proxy:proxies.values()) {
proxy.close();
}
proxies = null;
jettys = null;
}
@Test
public void testNewReplicaOldLeader() throws Exception {
String collection = "testNewReplicaOldLeader";
CollectionAdminRequest.createCollection(collection, 1, 2)
.setCreateNodeSet("")
.process(cluster.getSolrClient());
Properties oldLir = new Properties();
oldLir.setProperty("lirVersion", "old");
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setProperties(oldLir)
.setNode(cluster.getJettySolrRunner(0).getNodeName())
.process(cluster.getSolrClient());
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setProperties(oldLir)
.setNode(cluster.getJettySolrRunner(1).getNodeName())
.process(cluster.getSolrClient());
addDocs(collection, 2, 0);
Slice shard1 = getCollectionState(collection).getSlice("shard1");
//introduce network partition between leader & replica
Replica notLeader = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
assertTrue(runInOldLIRMode(collection, "shard1", notLeader));
getProxyForReplica(notLeader).close();
getProxyForReplica(shard1.getLeader()).close();
addDoc(collection, 2, getJettyForReplica(shard1.getLeader()));
waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
(liveNodes, collectionState) ->
collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
getProxyForReplica(shard1.getLeader()).reopen();
getProxyForReplica(notLeader).reopen();
// make sure that, when new replica works with old leader, it still can recovery normally
waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 2);
// make sure that, when new replica restart during LIR, it still can recovery normally (by looking at LIR node)
getProxyForReplica(notLeader).close();
getProxyForReplica(shard1.getLeader()).close();
addDoc(collection, 3, getJettyForReplica(shard1.getLeader()));
waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
(liveNodes, collectionState) ->
collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
JettySolrRunner notLeaderJetty = getJettyForReplica(notLeader);
notLeaderJetty.stop();
waitForState("Node did not leave", collection, (liveNodes, collectionState) -> liveNodes.size() == 2);
upgrade(notLeaderJetty);
notLeaderJetty.start();
getProxyForReplica(shard1.getLeader()).reopen();
getProxyForReplica(notLeader).reopen();
waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
assertFalse(runInOldLIRMode(collection, "shard1", notLeader));
assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 3);
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
}
public void testNewLeaderOldReplica() throws Exception {
// in case of new leader & old replica, new leader can still put old replica into LIR
String collection = "testNewLeaderOldReplica";
CollectionAdminRequest.createCollection(collection, 1, 2)
.setCreateNodeSet("")
.process(cluster.getSolrClient());
Properties oldLir = new Properties();
oldLir.setProperty("lirVersion", "old");
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setNode(cluster.getJettySolrRunner(0).getNodeName())
.process(cluster.getSolrClient());
waitForState("Timeout waiting for shard1 become active", collection, (liveNodes, collectionState) -> {
Slice shard1 = collectionState.getSlice("shard1");
if (shard1.getReplicas().size() == 1 && shard1.getLeader() != null) return true;
return false;
});
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setProperties(oldLir)
.setNode(cluster.getJettySolrRunner(1).getNodeName())
.process(cluster.getSolrClient());
Slice shard1 = getCollectionState(collection).getSlice("shard1");
Replica notLeader = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
Replica leader = shard1.getLeader();
assertTrue(runInOldLIRMode(collection, "shard1", notLeader));
assertFalse(runInOldLIRMode(collection, "shard1", leader));
addDocs(collection, 2, 0);
getProxyForReplica(notLeader).close();
getProxyForReplica(leader).close();
JettySolrRunner leaderJetty = getJettyForReplica(leader);
addDoc(collection, 2, leaderJetty);
waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
(liveNodes, collectionState) ->
collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
// wait a little bit
Thread.sleep(500);
getProxyForReplica(notLeader).reopen();
getProxyForReplica(leader).reopen();
waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 2);
// ensure that after recovery, the upgraded replica will clean its LIR status cause it is no longer needed
assertFalse(cluster.getSolrClient().getZkStateReader().getZkClient().exists(
ZkController.getLeaderInitiatedRecoveryZnodePath(collection, "shard1", notLeader.getName()), true));
// ensure that, leader should not register other replica's term
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
assertFalse(zkShardTerms.getTerms().containsKey(notLeader.getName()));
}
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
}
public void testLeaderAndMixedReplicas(boolean leaderInOldMode) throws Exception {
// in case of new leader and mixed old replica and new replica, new leader can still put all of them into recovery
// step1 : setup collection
String collection = "testMixedReplicas-"+leaderInOldMode;
CollectionAdminRequest.createCollection(collection, 1, 2)
.setCreateNodeSet("")
.process(cluster.getSolrClient());
Properties oldLir = new Properties();
oldLir.setProperty("lirVersion", "old");
if (leaderInOldMode) {
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setProperties(oldLir)
.setNode(cluster.getJettySolrRunner(0).getNodeName())
.process(cluster.getSolrClient());
} else {
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setNode(cluster.getJettySolrRunner(0).getNodeName())
.process(cluster.getSolrClient());
}
waitForState("Timeout waiting for shard1 become active", collection, clusterShape(1, 1));
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setProperties(oldLir)
.setNode(cluster.getJettySolrRunner(1).getNodeName())
.process(cluster.getSolrClient());
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setNode(cluster.getJettySolrRunner(2).getNodeName())
.process(cluster.getSolrClient());
Slice shard1 = getCollectionState(collection).getSlice("shard1");
Replica replicaInOldMode = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
Replica replicaInNewMode = shard1.getReplicas(x -> x != shard1.getLeader()).get(1);
Replica leader = shard1.getLeader();
assertEquals(leaderInOldMode, runInOldLIRMode(collection, "shard1", leader));
if (!runInOldLIRMode(collection, "shard1", replicaInOldMode)) {
Replica temp = replicaInOldMode;
replicaInOldMode = replicaInNewMode;
replicaInNewMode = temp;
}
assertTrue(runInOldLIRMode(collection, "shard1", replicaInOldMode));
assertFalse(runInOldLIRMode(collection, "shard1", replicaInNewMode));
addDocs(collection, 2, 0);
// step2 : introduce network partition then add doc, replicas should be put into recovery
getProxyForReplica(replicaInOldMode).close();
getProxyForReplica(replicaInNewMode).close();
getProxyForReplica(leader).close();
JettySolrRunner leaderJetty = getJettyForReplica(leader);
addDoc(collection, 2, leaderJetty);
Replica finalReplicaInOldMode = replicaInOldMode;
waitForState("Replica " + replicaInOldMode.getName() + " is not put as DOWN", collection,
(liveNodes, collectionState) ->
collectionState.getSlice("shard1").getReplica(finalReplicaInOldMode.getName()).getState() == Replica.State.DOWN);
// wait a little bit
Thread.sleep(500);
getProxyForReplica(replicaInOldMode).reopen();
getProxyForReplica(replicaInNewMode).reopen();
getProxyForReplica(leader).reopen();
waitForState("Timeout waiting for recovering", collection, clusterShape(1, 3));
assertDocsExistInAllReplicas(Arrays.asList(replicaInNewMode, replicaInOldMode), collection, 0, 2);
addDocs(collection, 3, 3);
// ensure that, leader should not register other replica's term
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
assertFalse(zkShardTerms.getTerms().containsKey(replicaInOldMode.getName()));
}
// step3 : upgrade the replica running in old mode to the new mode
getProxyForReplica(leader).close();
getProxyForReplica(replicaInOldMode).close();
addDoc(collection, 6, leaderJetty);
JettySolrRunner oldJetty = getJettyForReplica(replicaInOldMode);
oldJetty.stop();
waitForState("Node did not leave", collection, (liveNodes, collectionState)
-> liveNodes.size() == 2);
upgrade(oldJetty);
oldJetty.start();
getProxyForReplica(leader).reopen();
getProxyForReplica(replicaInOldMode).reopen();
waitForState("Timeout waiting for recovering", collection, clusterShape(1, 3));
assertDocsExistInAllReplicas(Arrays.asList(replicaInNewMode, replicaInOldMode), collection, 0, 6);
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
}
@Test
public void testNewLeaderAndMixedReplicas() throws Exception {
testLeaderAndMixedReplicas(false);
}
@Test
public void testOldLeaderAndMixedReplicas() throws Exception {
testLeaderAndMixedReplicas(true);
}
private void upgrade(JettySolrRunner solrRunner) {
File[] corePaths = new File(solrRunner.getSolrHome()).listFiles();
for (File corePath : corePaths) {
File coreProperties = new File(corePath, "core.properties");
if (!coreProperties.exists()) continue;
Properties properties = new Properties();
try (Reader reader = new InputStreamReader(new FileInputStream(coreProperties), "UTF-8")) {
properties.load(reader);
} catch (Exception e) {
continue;
}
properties.remove("lirVersion");
try (Writer writer = new OutputStreamWriter(new FileOutputStream(coreProperties), "UTF-8")) {
properties.store(writer, "Upgraded");
} catch (Exception e) {
continue;
}
}
}
protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
String testCollectionName, int firstDocId, int lastDocId)
throws Exception {
Replica leader =
cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName);
List<HttpSolrClient> replicas =
new ArrayList<HttpSolrClient>(notLeaders.size());
for (Replica r : notLeaders) {
replicas.add(getHttpSolrClient(r, testCollectionName));
}
try {
for (int d = firstDocId; d <= lastDocId; d++) {
String docId = String.valueOf(d);
assertDocExists(leaderSolr, testCollectionName, docId);
for (HttpSolrClient replicaSolr : replicas) {
assertDocExists(replicaSolr, testCollectionName, docId);
}
}
} finally {
if (leaderSolr != null) {
leaderSolr.close();
}
for (HttpSolrClient replicaSolr : replicas) {
replicaSolr.close();
}
}
}
protected void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception {
NamedList rsp = realTimeGetDocId(solr, docId);
String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId);
assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+ " due to: " + match + "; rsp="+rsp, match == null);
}
private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException {
QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
return solr.request(qr);
}
protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
String url = zkProps.getBaseUrl() + "/" + coll;
return getHttpSolrClient(url);
}
private <T> void waitFor(int waitTimeInSecs, T expected, Supplier<T> supplier) throws InterruptedException {
TimeOut timeOut = new TimeOut(waitTimeInSecs, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
while (!timeOut.hasTimedOut()) {
if (expected == supplier.get()) return;
Thread.sleep(100);
}
assertEquals(expected, supplier.get());
}
private boolean runInOldLIRMode(String collection, String shard, Replica replica) {
try (ZkShardTerms shardTerms = new ZkShardTerms(collection, shard, cluster.getZkClient())) {
return !shardTerms.registered(replica.getName());
}
}
private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException {
try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) {
solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId), "fieldName_s", String.valueOf(docId)));
}
}
private void addDocs(String collection, int numDocs, int startId) throws SolrServerException, IOException {
List<SolrInputDocument> docs = new ArrayList<>(numDocs);
for (int i = 0; i < numDocs; i++) {
int id = startId + i;
docs.add(new SolrInputDocument("id", String.valueOf(id), "fieldName_s", String.valueOf(id)));
}
cluster.getSolrClient().add(collection, docs);
cluster.getSolrClient().commit(collection);
}
protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception {
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
assertNotNull(replicaBaseUrl);
URL baseUrl = new URL(replicaBaseUrl);
JettySolrRunner proxy = jettys.get(baseUrl.toURI());
assertNotNull("No proxy found for " + baseUrl + "!", proxy);
return proxy;
}
protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
assertNotNull(replicaBaseUrl);
URL baseUrl = new URL(replicaBaseUrl);
SocketProxy proxy = proxies.get(baseUrl.toURI());
if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
baseUrl = new URL(baseUrl.toExternalForm() + "/");
proxy = proxies.get(baseUrl.toURI());
}
assertNotNull("No proxy found for " + baseUrl + "!", proxy);
return proxy;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Properties;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.lucene.util.LuceneTestCase.Slow;
@ -26,6 +27,7 @@ import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.SolrZkClient;
@ -45,6 +47,7 @@ import org.slf4j.LoggerFactory;
@Slow
@Nightly
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-10071")
@Deprecated
public class LeaderInitiatedRecoveryOnShardRestartTest extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -86,7 +89,14 @@ public class LeaderInitiatedRecoveryOnShardRestartTest extends AbstractFullDistr
String testCollectionName = "all_in_lir";
String shardId = "shard1";
createCollection(testCollectionName, "conf1", 1, 3, 1);
CollectionAdminRequest.createCollection(testCollectionName, "conf1", 1, 3)
.setCreateNodeSet("")
.process(cloudClient);
Properties oldLir = new Properties();
oldLir.setProperty("lirVersion", "old");
for (int i = 0; i < 3; i++) {
CollectionAdminRequest.addReplicaToShard(testCollectionName, "shard1").setProperties(oldLir).process(cloudClient);
}
waitForRecoveriesToFinish(testCollectionName, false);

View File

@ -34,6 +34,7 @@ import org.apache.zookeeper.data.Stat;
/**
* Test for {@link LeaderInitiatedRecoveryThread}
*/
@Deprecated
@SolrTestCaseJ4.SuppressSSL
public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTestBase {

View File

@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZkShardTermsTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(1)
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
}
public void testParticipationOfReplicas() throws IOException, SolrServerException, InterruptedException {
String collection = "collection1";
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard2", cluster.getZkClient())) {
zkShardTerms.registerTerm("replica1");
zkShardTerms.registerTerm("replica2");
zkShardTerms.ensureTermsIsHigher("replica1", Collections.singleton("replica2"));
}
// When new collection is created, the old term nodes will be removed
CollectionAdminRequest.createCollection(collection, 2, 2)
.setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName())
.setMaxShardsPerNode(1000)
.process(cluster.getSolrClient());
ZkController zkController = cluster.getJettySolrRunners().get(0).getCoreContainer().getZkController();
waitFor(2, () -> zkController.getShardTerms(collection, "shard1").getTerms().size());
assertArrayEquals(new Long[]{0L, 0L}, zkController.getShardTerms(collection, "shard1").getTerms().values().toArray(new Long[2]));
waitFor(2, () -> zkController.getShardTerms(collection, "shard2").getTerms().size());
assertArrayEquals(new Long[]{0L, 0L}, zkController.getShardTerms(collection, "shard2").getTerms().values().toArray(new Long[2]));
}
public void testRegisterTerm() throws InterruptedException {
String collection = "registerTerm";
ZkShardTerms rep1Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
ZkShardTerms rep2Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
rep1Terms.registerTerm("rep1");
rep2Terms.registerTerm("rep2");
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
assertEquals(0L, zkShardTerms.getTerm("rep1"));
assertEquals(0L, zkShardTerms.getTerm("rep2"));
}
waitFor(2, () -> rep1Terms.getTerms().size());
rep1Terms.ensureTermsIsHigher("rep1", Collections.singleton("rep2"));
assertEquals(1L, rep1Terms.getTerm("rep1"));
assertEquals(0L, rep1Terms.getTerm("rep2"));
// assert registerTerm does not override current value
rep1Terms.registerTerm("rep1");
assertEquals(1L, rep1Terms.getTerm("rep1"));
waitFor(1L, () -> rep2Terms.getTerm("rep1"));
rep2Terms.setEqualsToMax("rep2");
assertEquals(1L, rep2Terms.getTerm("rep2"));
rep2Terms.registerTerm("rep2");
assertEquals(1L, rep2Terms.getTerm("rep2"));
// zkShardTerms must stay updated by watcher
Map<String, Long> expectedTerms = new HashMap<>();
expectedTerms.put("rep1", 1L);
expectedTerms.put("rep2", 1L);
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
while (!timeOut.hasTimedOut()) {
if (Objects.equals(expectedTerms, rep1Terms.getTerms()) && Objects.equals(expectedTerms, rep2Terms.getTerms())) break;
}
if (timeOut.hasTimedOut()) fail("Expected zkShardTerms must stay updated");
rep1Terms.close();
rep2Terms.close();
}
@Test
public void testRaceConditionOnUpdates() throws InterruptedException {
String collection = "raceConditionOnUpdates";
List<String> replicas = Arrays.asList("rep1", "rep2", "rep3", "rep4");
for (String replica : replicas) {
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
zkShardTerms.registerTerm(replica);
}
}
List<String> failedReplicas = new ArrayList<>(replicas);
Collections.shuffle(failedReplicas, random());
while (failedReplicas.size() > 2) {
failedReplicas.remove(0);
}
AtomicBoolean stop = new AtomicBoolean(false);
Thread[] threads = new Thread[failedReplicas.size()];
for (int i = 0; i < failedReplicas.size(); i++) {
String replica = failedReplicas.get(i);
threads[i] = new Thread(() -> {
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
while (!stop.get()) {
try {
Thread.sleep(random().nextInt(200));
zkShardTerms.setEqualsToMax(replica);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
threads[i].start();
}
long maxTerm = 0;
try (ZkShardTerms shardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
shardTerms.registerTerm("leader");
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
while (!timeOut.hasTimedOut()) {
maxTerm++;
assertEquals(shardTerms.getTerms().get("leader"), Collections.max(shardTerms.getTerms().values()));
Thread.sleep(100);
}
assertTrue(maxTerm >= Collections.max(shardTerms.getTerms().values()));
}
stop.set(true);
for (Thread thread : threads) {
thread.join();
}
}
public void testCoreTermWatcher() throws InterruptedException {
String collection = "coreTermWatcher";
ZkShardTerms leaderTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
leaderTerms.registerTerm("leader");
ZkShardTerms replicaTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
AtomicInteger count = new AtomicInteger(0);
// this will get called for almost 3 times
ZkShardTerms.CoreTermWatcher watcher = terms -> count.incrementAndGet() < 3;
replicaTerms.addListener(watcher);
replicaTerms.registerTerm("replica");
waitFor(1, count::get);
leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
waitFor(2, count::get);
replicaTerms.setEqualsToMax("replica");
waitFor(3, count::get);
assertEquals(0, replicaTerms.getNumListeners());
leaderTerms.close();
replicaTerms.close();
}
public void testEnsureTermsIsHigher() {
Map<String, Long> map = new HashMap<>();
map.put("leader", 0L);
ZkShardTerms.Terms terms = new ZkShardTerms.Terms(map, 0);
terms = terms.increaseTerms("leader", Collections.singleton("replica"));
assertEquals(1L, terms.getTerm("leader").longValue());
}
private <T> void waitFor(T expected, Supplier<T> supplier) throws InterruptedException {
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
while (!timeOut.hasTimedOut()) {
if (expected == supplier.get()) return;
Thread.sleep(100);
}
assertEquals(expected, supplier.get());
}
}

View File

@ -42,7 +42,7 @@ import org.apache.solr.client.solrj.request.schema.SchemaRequest.Field;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.client.solrj.response.schema.SchemaResponse.FieldResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
@ -908,23 +908,20 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
commit();
// TODO: Could try checking ZK for LIR flags to ensure LIR has not kicked in
// Check every 10ms, 100 times, for a replica to go down (& assert that it doesn't)
ZkController zkController = shardToLeaderJetty.get(SHARD1).jetty.getCoreContainer().getZkController();
String lirPath = zkController.getLeaderInitiatedRecoveryZnodePath(DEFAULT_TEST_COLLECTION_NAME, SHARD1);
assertFalse (zkController.getZkClient().exists(lirPath, true));
try (ZkShardTerms zkShardTerms = new ZkShardTerms(DEFAULT_COLLECTION, SHARD1, cloudClient.getZkStateReader().getZkClient())) {
for (int i=0; i<100; i++) {
Thread.sleep(10);
cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
ClusterState state = cloudClient.getZkStateReader().getClusterState();
for (int i=0; i<100; i++) {
Thread.sleep(10);
cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
ClusterState state = cloudClient.getZkStateReader().getClusterState();
int numActiveReplicas = 0;
for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas())
if (rep.getState().equals(Replica.State.ACTIVE))
numActiveReplicas++;
assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
int numActiveReplicas = 0;
for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas()) {
assertTrue(zkShardTerms.canBecomeLeader(rep.getName()));
if (rep.getState().equals(Replica.State.ACTIVE))
numActiveReplicas++;
}
assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
}
}
for (SolrClient client: new SolrClient[] {LEADER, NONLEADERS.get(0),

View File

@ -1963,6 +1963,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
long waitMs = 0L;
long maxWaitMs = maxWaitSecs * 1000L;
Replica leader = null;
ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, shardId, cloudClient.getZkStateReader().getZkClient());
while (waitMs < maxWaitMs && !allReplicasUp) {
cs = cloudClient.getZkStateReader().getClusterState();
assertNotNull(cs);
@ -1981,7 +1982,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
// ensure all replicas are "active" and identify the non-leader replica
for (Replica replica : replicas) {
if (replica.getState() != Replica.State.ACTIVE) {
if (!zkShardTerms.canBecomeLeader(replica.getName()) ||
replica.getState() != Replica.State.ACTIVE) {
log.info("Replica {} is currently {}", replica.getName(), replica.getState());
allReplicasUp = false;
}
@ -1998,6 +2000,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
} // end while
zkShardTerms.close();
if (!allReplicasUp)
fail("Didn't see all replicas for shard "+shardId+" in "+testCollectionName+
" come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo());