SOLR-12011: Consistence problem when in-sync replicas are DOWN

This commit is contained in:
Cao Manh Dat 2018-03-04 12:57:05 +07:00
parent ad7e94afb2
commit 9de4225e9a
16 changed files with 595 additions and 144 deletions

View File

@ -78,6 +78,9 @@ Upgrade Notes
* LUCENE-8161: If you are using the spatial JTS library with Solr, you must upgrade to 1.15.0. This new version
of JTS is now dual-licensed to include a BSD style license.
* SOLR-12011: Replicas which are not up-to-date are not allowed to become leader. Use FORCELEADER API to
allow these replicas become leader.
New Features
----------------------
* SOLR-11285: Simulation framework for autoscaling. (ab)
@ -236,6 +239,8 @@ Bug Fixes
Also changed the display label in the Admin UI from routerField to router.field to match the actual API.
(Shawn Heisey via Cassandra Targett)
* SOLR-12011: Consistence problem when in-sync replicas are DOWN. (Cao Manh Dat)
Optimizations
----------------------

View File

@ -42,7 +42,7 @@ public class CloudDescriptor {
// set to true once a core has registered in zk
// set to false on detecting a session expiration
private volatile boolean hasRegistered = false;
volatile Replica.State lastPublished = Replica.State.ACTIVE;
private volatile Replica.State lastPublished = Replica.State.ACTIVE;
public static final String NUM_SHARDS = "numShards";

View File

@ -20,9 +20,10 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -359,13 +360,19 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
// should I be leader?
if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
&& !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
rejoinLeaderElection(core);
return;
}
if (isClosed) {
return;
}
log.info("I may be the new leader - try and sync");
// we are going to attempt to be the leader
@ -516,8 +523,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
zkStateReader.forceUpdateCollection(collection);
ClusterState clusterState = zkStateReader.getClusterState();
Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP));
if (rep != null && rep.getState() != Replica.State.ACTIVE
&& rep.getState() != Replica.State.RECOVERING) {
if (rep != null && rep.getState() != Replica.State.ACTIVE) {
log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
}
@ -593,34 +599,33 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
if (replicas != null && replicas.size() > 0) {
// set of replicas which is running in new LIR but lirState=DOWN
Set<String> replicasMustBeInLowerTerm = new HashSet<>();
for (String replicaCoreNodeName : replicas) {
if (coreNodeName.equals(replicaCoreNodeName))
continue; // added safe-guard so we don't mark this core as down
if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
// the replica registered its term so it is running with the new LIR implementation
// we can put this replica into recovery by increase our terms
zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, Collections.singleton(replicaCoreNodeName));
continue;
}
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) {
log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: "
+ lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
List<ZkCoreNodeProps> replicaProps =
zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName);
List<Replica> replicasProps =
zkController.getZkStateReader().getClusterState().getCollection(collection)
.getSlice(shardId).getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
if (replicaProps != null && replicaProps.size() > 0) {
if (replicasProps != null && replicasProps.size() > 0) {
ZkCoreNodeProps coreNodeProps = null;
for (ZkCoreNodeProps p : replicaProps) {
if (((Replica)p.getNodeProps()).getName().equals(replicaCoreNodeName)) {
coreNodeProps = p;
for (Replica p : replicasProps) {
if (p.getName().equals(replicaCoreNodeName)) {
coreNodeProps = new ZkCoreNodeProps(p);
break;
}
}
if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
replicasMustBeInLowerTerm.add(replicaCoreNodeName);
} else {
zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
collection, shardId, coreNodeProps, core.getCoreDescriptor(),
false /* forcePublishState */);
@ -628,6 +633,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
}
}
// these replicas registered their terms so it is running with the new LIR implementation
// we can put this replica into recovery by increase our terms
zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, replicasMustBeInLowerTerm);
}
} // core gets closed automagically
}
@ -741,39 +750,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
leaderElector.joinElection(this, true);
}
private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) {
log.debug("Checking if I should try and be the leader.");
if (isClosed) {
log.debug("Bailing on leader process because we have been closed");
return false;
}
if (!weAreReplacement) {
// we are the first node starting in the shard - there is a configurable wait
// to make sure others participate in sync and leader election, we can be leader
return true;
}
String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
&& !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
return false;
}
if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) {
log.debug("My last published State was Active, it's okay to be the leader.");
return true;
}
log.debug("My last published State was "
+ core.getCoreDescriptor().getCloudDescriptor().getLastPublished()
+ ", I won't be the leader.");
// TODO: and if no one is a good candidate?
return false;
}
}
final class OverseerElectionContext extends ElectionContext {

View File

@ -48,7 +48,7 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
if (terms.canBecomeLeader(coreNodeName)) return true;
if (terms.haveHighestTermValue(coreNodeName)) return true;
if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
lastTermDoRecovery.set(terms.getTerm(coreNodeName));

View File

@ -1045,7 +1045,7 @@ public class ZkController {
// This flag is used for testing rolling updates and should be removed in SOLR-11812
boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
if (isRunningInNewLIR) {
if (isRunningInNewLIR && cloudDesc.getReplicaType() != Type.PULL) {
shardTerms.registerTerm(coreZkNodeName);
}
String shardId = cloudDesc.getShardId();
@ -1455,13 +1455,20 @@ public class ZkController {
// This flag is used for testing rolling updates and should be removed in SOLR-11812
boolean isRunningInNewLIR = "new".equals(cd.getCoreProperty("lirVersion", "new"));
if (state == Replica.State.RECOVERING && isRunningInNewLIR) {
getShardTerms(collection, shardId).setEqualsToMax(coreNodeName);
// pull replicas are excluded because their terms are not considered
if (state == Replica.State.RECOVERING && isRunningInNewLIR && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
// state is used by client, state of replica can change from RECOVERING to DOWN without needed to finish recovery
// by calling this we will know that a replica actually finished recovery or not
getShardTerms(collection, shardId).startRecovering(coreNodeName);
}
if (state == Replica.State.ACTIVE && isRunningInNewLIR && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
getShardTerms(collection, shardId).doneRecovering(coreNodeName);
}
ZkNodeProps m = new ZkNodeProps(props);
if (updateLastState) {
cd.getCloudDescriptor().lastPublished = state;
cd.getCloudDescriptor().setLastPublished(state);
}
overseerJobQueue.offer(Utils.toJSON(m));
} finally {

View File

@ -22,7 +22,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
@ -100,6 +99,8 @@ public class ZkShardTerms implements AutoCloseable{
* @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
*/
public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
if (replicasNeedingRecovery.isEmpty()) return;
Terms newTerms;
while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) {
if (forceSaveTerms(newTerms)) return;
@ -107,7 +108,7 @@ public class ZkShardTerms implements AutoCloseable{
}
/**
* Can this replica become leader or is this replica's term equals to leader's term?
* Can this replica become leader?
* @param coreNodeName of the replica
* @return true if this replica can become leader, false if otherwise
*/
@ -115,6 +116,15 @@ public class ZkShardTerms implements AutoCloseable{
return terms.canBecomeLeader(coreNodeName);
}
/**
* Should leader skip sending updates to this replica?
* @param coreNodeName of the replica
* @return true if this replica has term equals to leader's term, false if otherwise
*/
public boolean skipSendingUpdatesTo(String coreNodeName) {
return !terms.haveHighestTermValue(coreNodeName);
}
/**
* Did this replica registered its term? This is a sign to check f
* @param coreNodeName of the replica
@ -184,16 +194,59 @@ public class ZkShardTerms implements AutoCloseable{
}
/**
* Set a replica's term equals to leader's term
* Set a replica's term equals to leader's term.
* This call should only be used by {@link org.apache.solr.common.params.CollectionParams.CollectionAction#FORCELEADER}
* @param coreNodeName of the replica
*/
public void setEqualsToMax(String coreNodeName) {
public void setTermEqualsToLeader(String coreNodeName) {
Terms newTerms;
while ( (newTerms = terms.setEqualsToMax(coreNodeName)) != null) {
while ( (newTerms = terms.setTermEqualsToLeader(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
public void setTermToZero(String coreNodeName) {
Terms newTerms;
while ( (newTerms = terms.setTermToZero(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
/**
* Mark {@code coreNodeName} as recovering
*/
public void startRecovering(String coreNodeName) {
Terms newTerms;
while ( (newTerms = terms.startRecovering(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
/**
* Mark {@code coreNodeName} as finished recovering
*/
public void doneRecovering(String coreNodeName) {
Terms newTerms;
while ( (newTerms = terms.doneRecovering(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
/**
* When first updates come in, all replicas have some data now,
* so we must switch from term 0 (registered) to 1 (have some data)
*/
public void ensureHighestTermsAreNotZero() {
Terms newTerms;
while ( (newTerms = terms.ensureHighestTermsAreNotZero()) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
public long getHighestTerm() {
return terms.getMaxTerm();
}
public long getTerm(String coreNodeName) {
Long term = terms.getTerm(coreNodeName);
return term == null? -1 : term;
@ -232,6 +285,7 @@ public class ZkShardTerms implements AutoCloseable{
try {
Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
setNewTerms(new Terms(newTerms.values, stat.getVersion()));
log.info("Successful update terms at {} to {}", znodePath, newTerms);
return true;
} catch (KeeperException.BadVersionException e) {
log.info("Failed to save terms, version is not match, retrying");
@ -367,6 +421,7 @@ public class ZkShardTerms implements AutoCloseable{
*/
static class Terms {
private final Map<String, Long> values;
private final long maxTerm;
// ZK node version
private final int version;
@ -377,14 +432,25 @@ public class ZkShardTerms implements AutoCloseable{
public Terms(Map<String, Long> values, int version) {
this.values = values;
this.version = version;
if (values.isEmpty()) this.maxTerm = 0;
else this.maxTerm = Collections.max(values.values());
}
/**
* Can this replica become leader or is this replica's term equals to leader's term?
* Can {@code coreNodeName} become leader?
* @param coreNodeName of the replica
* @return true if this replica can become leader, false if otherwise
* @return true if {@code coreNodeName} can become leader, false if otherwise
*/
boolean canBecomeLeader(String coreNodeName) {
return haveHighestTermValue(coreNodeName) && !values.containsKey(coreNodeName + "_recovering");
}
/**
* Is {@code coreNodeName}'s term highest?
* @param coreNodeName of the replica
* @return true if term of {@code coreNodeName} is highest
*/
boolean haveHighestTermValue(String coreNodeName) {
if (values.isEmpty()) return true;
long maxTerm = Collections.max(values.values());
return values.getOrDefault(coreNodeName, 0L) == maxTerm;
@ -427,6 +493,21 @@ public class ZkShardTerms implements AutoCloseable{
return new Terms(newValues, version);
}
/**
* Return a new {@link Terms} in which highest terms are not zero
* @return null if highest terms are already larger than zero
*/
Terms ensureHighestTermsAreNotZero() {
if (maxTerm > 0) return null;
else {
HashMap<String, Long> newValues = new HashMap<>(values);
for (String replica : values.keySet()) {
newValues.put(replica, 1L);
}
return new Terms(newValues, version);
}
}
/**
* Return a new {@link Terms} in which term of {@code coreNodeName} is removed
* @param coreNodeName of the replica
@ -453,23 +534,70 @@ public class ZkShardTerms implements AutoCloseable{
return new Terms(newValues, version);
}
Terms setTermToZero(String coreNodeName) {
if (values.getOrDefault(coreNodeName, -1L) == 0) {
return null;
}
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, 0L);
return new Terms(newValues, version);
}
/**
* Return a new {@link Terms} in which the term of {@code coreNodeName} is max
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already maximum
*/
Terms setEqualsToMax(String coreNodeName) {
long maxTerm;
try {
maxTerm = Collections.max(values.values());
} catch (NoSuchElementException e){
maxTerm = 0;
}
Terms setTermEqualsToLeader(String coreNodeName) {
long maxTerm = getMaxTerm();
if (values.get(coreNodeName) == maxTerm) return null;
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, maxTerm);
return new Terms(newValues, version);
}
long getMaxTerm() {
return maxTerm;
}
/**
* Mark {@code coreNodeName} as recovering
* @param coreNodeName of the replica
* @return null if {@code coreNodeName} is already marked as doing recovering
*/
Terms startRecovering(String coreNodeName) {
long maxTerm = getMaxTerm();
if (values.get(coreNodeName) == maxTerm && values.getOrDefault(coreNodeName+"_recovering", -1L) == maxTerm)
return null;
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, maxTerm);
newValues.put(coreNodeName+"_recovering", maxTerm);
return new Terms(newValues, version);
}
/**
* Mark {@code coreNodeName} as finished recovering
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already finished doing recovering
*/
Terms doneRecovering(String coreNodeName) {
if (!values.containsKey(coreNodeName+"_recovering")) {
return null;
}
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.remove(coreNodeName+"_recovering");
return new Terms(newValues, version);
}
@Override
public String toString() {
return "Terms{" +
"values=" + values +
", version=" + version +
'}';
}
}
}

View File

@ -1132,6 +1132,8 @@ public class CoreContainer {
if (leader != null && leader.getState() == State.ACTIVE) {
log.info("Found active leader, will attempt to create fresh core and recover.");
resetIndexDirectory(dcore, coreConfig);
// the index of this core is emptied, its term should be set to 0
getZkController().getShardTerms(desc.getCollectionName(), desc.getShardId()).setTermToZero(desc.getCoreNodeName());
return new SolrCore(this, dcore, coreConfig);
}
} catch (SolrException se) {

View File

@ -1149,7 +1149,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
if (optionalMaxTerm.isPresent()) {
liveReplicas.stream()
.filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong())
.forEach(rep -> zkShardTerms.setEqualsToMax(rep.getName()));
.forEach(rep -> zkShardTerms.setTermEqualsToLeader(rep.getName()));
}
}

View File

@ -28,7 +28,6 @@ import org.apache.lucene.store.Directory;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
@ -234,16 +233,6 @@ enum CoreAdminOperation implements CoreAdminOp {
if (cname == null) {
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
}
try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
// Setting the last published state for this core to be ACTIVE
if (core != null) {
core.getCoreDescriptor().getCloudDescriptor().setLastPublished(Replica.State.ACTIVE);
log().info("Setting the last published state for this core, {}, to {}", core.getName(), Replica.State.ACTIVE);
} else {
SolrException.log(log(), "Could not find core: " + cname);
}
}
}),
BACKUPCORE_OP(BACKUPCORE, new BackupCoreOp()),

View File

@ -127,7 +127,10 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
// if the replica is waiting for leader to see recovery state, the leader should refresh its terms
if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && !shardTerms.canBecomeLeader(coreNodeName)) {
if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && shardTerms.skipSendingUpdatesTo(coreNodeName)) {
// The replica changed it term, then published itself as RECOVERING.
// This core already see replica as RECOVERING
// so it is guarantees that a live-fetch will be enough for this core to see max term published
shardTerms.refreshTerms();
}

View File

@ -20,8 +20,10 @@ package org.apache.solr.handler.admin;
import java.net.URI;
import java.util.Optional;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.SolrCore;
@ -61,11 +63,22 @@ class RestoreCoreOp implements CoreAdminHandler.CoreAdminOp {
URI locationUri = repository.createURI(location);
try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
CloudDescriptor cd = core.getCoreDescriptor().getCloudDescriptor();
// this core must be the only replica in its shard otherwise
// we cannot guarantee consistency between replicas because when we add data (or restore index) to this replica
Slice slice = zkController.getClusterState().getCollection(cd.getCollectionName()).getSlice(cd.getShardId());
if (slice.getReplicas().size() != 1) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Failed to restore core=" + core.getName() + ", the core must be the only replica in its shard");
}
RestoreCore restoreCore = new RestoreCore(repository, core, locationUri, name);
boolean success = restoreCore.doRestore();
if (!success) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to restore core=" + core.getName());
}
// other replicas to-be-created will know that they are out of date by
// looking at their term : 0 compare to term of this core : 1
zkController.getShardTerms(cd.getCollectionName(), cd.getShardId()).ensureHighestTermsAreNotZero();
}
}
}

View File

@ -23,6 +23,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@ -111,6 +113,16 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
SolrCore newcore = it.handler.coreContainer.getCore(newCoreName);
if (newcore != null) {
newCores.add(newcore);
if (it.handler.coreContainer.isZooKeeperAware()) {
// this core must be the only replica in its shard otherwise
// we cannot guarantee consistency between replicas because when we add data to this replica
CloudDescriptor cd = newcore.getCoreDescriptor().getCloudDescriptor();
ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
if (clusterState.getCollection(cd.getCollectionName()).getSlice(cd.getShardId()).getReplicas().size() != 1) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Core with core name " + newCoreName + " must be the only replica in shard " + cd.getShardId());
}
}
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Core with core name " + newCoreName + " expected but doesn't exist.");
}
@ -123,6 +135,15 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges, router, routeFieldName, splitKey);
core.getUpdateHandler().split(cmd);
if (it.handler.coreContainer.isZooKeeperAware()) {
for (SolrCore newcore : newCores) {
// the index of the core changed from empty to have some data, its term must be not zero
CloudDescriptor cd = newcore.getCoreDescriptor().getCloudDescriptor();
ZkShardTerms zkShardTerms = it.handler.coreContainer.getZkController().getShardTerms(cd.getCollectionName(), cd.getShardId());
zkShardTerms.ensureHighestTermsAreNotZero();
}
}
// After the split has completed, someone (here?) should start the process of replaying the buffered updates.
} catch (Exception e) {

View File

@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -173,6 +172,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private boolean forwardToLeader = false;
private boolean isSubShardLeader = false;
private List<Node> nodes;
private Set<String> skippedCoreNodeNames;
private boolean isIndexChanged = false;
private UpdateCommand updateCommand; // the current command this processor is working on.
@ -334,9 +335,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
.getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN);
if (replicaProps == null) {
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
String leaderCoreNodeName = leaderReplica.getName();
List<Replica> replicas = clusterState.getCollection(collection)
.getSlice(shardId)
.getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName));
if (replicas.isEmpty()) {
return null;
}
@ -349,16 +354,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
log.info("test.distrib.skip.servers was found and contains:" + skipListSet);
}
List<Node> nodes = new ArrayList<>(replicaProps.size());
List<Node> nodes = new ArrayList<>(replicas.size());
skippedCoreNodeNames = new HashSet<>();
ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
for (ZkCoreNodeProps props : replicaProps) {
String coreNodeName = ((Replica) props.getNodeProps()).getName();
if (skipList != null && skipListSet.contains(props.getCoreUrl())) {
log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:true");
} else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
log.info("skip url:{} cause its term is less than leader", props.getCoreUrl());
for (Replica replica: replicas) {
String coreNodeName = replica.getName();
if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true");
} else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
skippedCoreNodeNames.add(replica.getName());
} else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) {
skippedCoreNodeNames.add(replica.getName());
} else {
nodes.add(new StdNode(props, collection, shardId));
nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId));
}
}
return nodes;
@ -750,6 +759,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// TODO: optionally fail if n replicas are not reached...
private void doFinish() {
boolean shouldUpdateTerms = isLeader && !isOldLIRMode && isIndexChanged;
if (shouldUpdateTerms) {
ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId());
if (skippedCoreNodeNames != null) {
zkShardTerms.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), skippedCoreNodeNames);
}
zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero();
}
// TODO: if not a forward and replication req is not specified, we could
// send in a background thread
@ -758,7 +775,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// TODO - we may need to tell about more than one error...
List<Error> errorsForClient = new ArrayList<>(errors.size());
Map<ShardInfo, Set<String>> failedReplicas = new HashMap<>();
Set<String> replicasShouldBeInLowerTerms = new HashSet<>();
for (final SolrCmdDistributor.Error error : errors) {
if (error.req.node instanceof RetryNode) {
@ -856,9 +873,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
Throwable rootCause = SolrException.getRootCause(error.e);
if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
ShardInfo shardInfo = new ShardInfo(collection, shardId, leaderCoreNodeName);
failedReplicas.putIfAbsent(shardInfo, new HashSet<>());
failedReplicas.get(shardInfo).add(coreNodeName);
replicasShouldBeInLowerTerms.add(coreNodeName);
} else {
// The replica did not registered its term, so it must run with old LIR implementation
log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
@ -891,11 +906,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
}
if (!isOldLIRMode) {
for (Map.Entry<ShardInfo, Set<String>> entry : failedReplicas.entrySet()) {
ShardInfo shardInfo = entry.getKey();
zkController.getShardTerms(shardInfo.collection, shardInfo.shard).ensureTermsIsHigher(shardInfo.leader, entry.getValue());
}
if (!isOldLIRMode && !replicasShouldBeInLowerTerms.isEmpty()) {
zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId())
.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), replicasShouldBeInLowerTerms);
}
// in either case, we need to attach the achieved and min rf to the response.
if (leaderReplicationTracker != null || rollupReplicationTracker != null) {
@ -929,47 +942,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
private class ShardInfo {
private String collection;
private String shard;
private String leader;
public ShardInfo(String collection, String shard, String leader) {
this.collection = collection;
this.shard = shard;
this.leader = leader;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardInfo shardInfo = (ShardInfo) o;
if (!collection.equals(shardInfo.collection)) return false;
if (!shard.equals(shardInfo.shard)) return false;
return leader.equals(shardInfo.leader);
}
@Override
public int hashCode() {
int result = collection.hashCode();
result = 31 * result + shard.hashCode();
result = 31 * result + leader.hashCode();
return result;
}
}
// must be synchronized by bucket
private void doLocalAdd(AddUpdateCommand cmd) throws IOException {
super.processAdd(cmd);
isIndexChanged = true;
}
// must be synchronized by bucket
private void doLocalDelete(DeleteUpdateCommand cmd) throws IOException {
super.processDelete(cmd);
isIndexChanged = true;
}
/**

View File

@ -0,0 +1,276 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestCloudConsistency extends SolrCloudTestCase {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static Map<JettySolrRunner, SocketProxy> proxies;
private static Map<URI, JettySolrRunner> jettys;
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
configureCluster(4)
.addConfig("conf", configset("cloud-minimal"))
.configure();
// Add proxies
proxies = new HashMap<>(cluster.getJettySolrRunners().size());
jettys = new HashMap<>();
for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
SocketProxy proxy = new SocketProxy();
jetty.setProxyPort(proxy.getListenPort());
cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart
cluster.startJettySolrRunner(jetty);
proxy.open(jetty.getBaseUrl().toURI());
LOG.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl());
proxies.put(jetty, proxy);
jettys.put(proxy.getUrl(), jetty);
}
}
@AfterClass
public static void tearDownCluster() throws Exception {
for (SocketProxy proxy:proxies.values()) {
proxy.close();
}
proxies = null;
jettys = null;
}
@Test
public void testOutOfSyncReplicasCannotBecomeLeader() throws Exception {
testOutOfSyncReplicasCannotBecomeLeader(false);
}
@Test
public void testOutOfSyncReplicasCannotBecomeLeaderAfterRestart() throws Exception {
testOutOfSyncReplicasCannotBecomeLeader(true);
}
public void testOutOfSyncReplicasCannotBecomeLeader(boolean onRestart) throws Exception {
final String collectionName = "outOfSyncReplicasCannotBecomeLeader-"+onRestart;
CollectionAdminRequest.createCollection(collectionName, 1, 3)
.setCreateNodeSet("")
.process(cluster.getSolrClient());
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
.setNode(cluster.getJettySolrRunner(0).getNodeName())
.process(cluster.getSolrClient());
waitForState("Timeout waiting for shard leader", collectionName, clusterShape(1, 1));
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
.setNode(cluster.getJettySolrRunner(1).getNodeName())
.process(cluster.getSolrClient());
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
.setNode(cluster.getJettySolrRunner(2).getNodeName())
.process(cluster.getSolrClient());
waitForState("Timeout waiting for 1x3 collection", collectionName, clusterShape(1, 3));
addDocs(collectionName, 3, 1);
final Replica oldLeader = getCollectionState(collectionName).getSlice("shard1").getLeader();
assertEquals(cluster.getJettySolrRunner(0).getNodeName(), oldLeader.getNodeName());
if (onRestart) {
addDocToWhenOtherReplicasAreDown(collectionName, oldLeader, 4);
} else {
addDocWhenOtherReplicasAreNetworkPartitioned(collectionName, oldLeader, 4);
}
assertDocsExistInAllReplicas(getCollectionState(collectionName).getReplicas(), collectionName, 1, 4);
CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
}
/**
* Adding doc when replicas (not leader) are down,
* These replicas are out-of-sync hence they should not become leader even when current leader is DOWN.
* Leader should be on node - 0
*/
private void addDocToWhenOtherReplicasAreDown(String collection, Replica leader, int docId) throws Exception {
ChaosMonkey.stop(cluster.getJettySolrRunner(1));
ChaosMonkey.stop(cluster.getJettySolrRunner(2));
waitForState("", collection, (liveNodes, collectionState) ->
collectionState.getSlice("shard1").getReplicas().stream()
.filter(replica -> replica.getState() == Replica.State.DOWN).count() == 2);
addDocs(collection, 1, docId);
ChaosMonkey.stop(cluster.getJettySolrRunner(0));
waitForState("", collection, (liveNodes, collectionState) -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
ChaosMonkey.start(cluster.getJettySolrRunner(1));
ChaosMonkey.start(cluster.getJettySolrRunner(2));
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.CURRENT_TIME);
while (!timeOut.hasTimedOut()) {
Replica newLeader = getCollectionState(collection).getSlice("shard1").getLeader();
if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
fail("Out of sync replica became leader " + newLeader);
}
}
ChaosMonkey.start(cluster.getJettySolrRunner(0));
waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> {
Replica newLeader = collectionState.getLeader("shard1");
return newLeader != null && newLeader.getName().equals(leader.getName());
});
waitForState("Timeout waiting for active collection", collection, clusterShape(1, 3));
}
/**
* Adding doc when replicas (not leader) are network partitioned with leader,
* These replicas are out-of-sync hence they should not become leader even when current leader is DOWN.
* Leader should be on node - 0
*/
private void addDocWhenOtherReplicasAreNetworkPartitioned(String collection, Replica leader, int docId) throws Exception {
for (int i = 0; i < 3; i++) {
proxies.get(cluster.getJettySolrRunner(i)).close();
}
addDoc(collection, docId, cluster.getJettySolrRunner(0));
ChaosMonkey.stop(cluster.getJettySolrRunner(0));
for (int i = 1; i < 3; i++) {
proxies.get(cluster.getJettySolrRunner(i)).reopen();
}
waitForState("Timeout waiting for leader goes DOWN", collection, (liveNodes, collectionState)
-> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.CURRENT_TIME);
while (!timeOut.hasTimedOut()) {
Replica newLeader = getCollectionState(collection).getLeader("shard1");
if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
fail("Out of sync replica became leader " + newLeader);
}
}
proxies.get(cluster.getJettySolrRunner(0)).reopen();
ChaosMonkey.start(cluster.getJettySolrRunner(0));
waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> {
Replica newLeader = collectionState.getLeader("shard1");
return newLeader != null && newLeader.getName().equals(leader.getName());
});
waitForState("Timeout waiting for active collection", collection, clusterShape(1, 3));
}
private void addDocs(String collection, int numDocs, int startId) throws SolrServerException, IOException {
List<SolrInputDocument> docs = new ArrayList<>(numDocs);
for (int i = 0; i < numDocs; i++) {
int id = startId + i;
docs.add(new SolrInputDocument("id", String.valueOf(id), "fieldName_s", String.valueOf(id)));
}
cluster.getSolrClient().add(collection, docs);
cluster.getSolrClient().commit(collection);
}
private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException {
try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) {
solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId), "fieldName_s", String.valueOf(docId)));
}
}
private void assertDocsExistInAllReplicas(List<Replica> notLeaders,
String testCollectionName, int firstDocId, int lastDocId) throws Exception {
Replica leader =
cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName);
List<HttpSolrClient> replicas =
new ArrayList<HttpSolrClient>(notLeaders.size());
for (Replica r : notLeaders) {
replicas.add(getHttpSolrClient(r, testCollectionName));
}
try {
for (int d = firstDocId; d <= lastDocId; d++) {
String docId = String.valueOf(d);
assertDocExists(leaderSolr, testCollectionName, docId);
for (HttpSolrClient replicaSolr : replicas) {
assertDocExists(replicaSolr, testCollectionName, docId);
}
}
} finally {
if (leaderSolr != null) {
leaderSolr.close();
}
for (HttpSolrClient replicaSolr : replicas) {
replicaSolr.close();
}
}
}
private void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception {
NamedList rsp = realTimeGetDocId(solr, docId);
String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId);
assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+ " due to: " + match + "; rsp="+rsp, match == null);
}
private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException {
QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
return solr.request(qr);
}
protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
String url = zkProps.getBaseUrl() + "/" + coll;
return getHttpSolrClient(url);
}
protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception {
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
assertNotNull(replicaBaseUrl);
URL baseUrl = new URL(replicaBaseUrl);
JettySolrRunner proxy = jettys.get(baseUrl.toURI());
assertNotNull("No proxy found for " + baseUrl + "!", proxy);
return proxy;
}
}

View File

@ -425,10 +425,19 @@ public class TestPullReplica extends SolrCloudTestCase {
Replica pullReplica = docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
assertTrue(pullReplica.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes()));
long highestTerm = 0L;
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) {
highestTerm = zkShardTerms.getHighestTerm();
}
// add document, this should fail since there is no leader. Pull replica should not accept the update
expectThrows(SolrException.class, () ->
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"))
);
if (removeReplica) {
try(ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) {
assertEquals(highestTerm, zkShardTerms.getHighestTerm());
}
}
// Also fails if I send the update to the pull replica explicitly
try (HttpSolrClient pullReplicaClient = getHttpSolrClient(docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
@ -436,6 +445,11 @@ public class TestPullReplica extends SolrCloudTestCase {
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"))
);
}
if (removeReplica) {
try(ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) {
assertEquals(highestTerm, zkShardTerms.getHighestTerm());
}
}
// Queries should still work
waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)));

View File

@ -94,7 +94,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
assertEquals(1L, rep1Terms.getTerm("rep1"));
waitFor(1L, () -> rep2Terms.getTerm("rep1"));
rep2Terms.setEqualsToMax("rep2");
rep2Terms.setTermEqualsToLeader("rep2");
assertEquals(1L, rep2Terms.getTerm("rep2"));
rep2Terms.registerTerm("rep2");
assertEquals(1L, rep2Terms.getTerm("rep2"));
@ -138,7 +138,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
while (!stop.get()) {
try {
Thread.sleep(random().nextInt(200));
zkShardTerms.setEqualsToMax(replica);
zkShardTerms.setTermEqualsToLeader(replica);
} catch (InterruptedException e) {
e.printStackTrace();
}
@ -178,7 +178,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
waitFor(1, count::get);
leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
waitFor(2, count::get);
replicaTerms.setEqualsToMax("replica");
replicaTerms.setTermEqualsToLeader("replica");
waitFor(3, count::get);
assertEquals(0, replicaTerms.getNumListeners());
@ -194,6 +194,41 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
assertEquals(1L, terms.getTerm("leader").longValue());
}
public void testSetTermToZero() {
String collection = "setTermToZero";
ZkShardTerms terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
terms.registerTerm("leader");
terms.registerTerm("replica");
terms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
assertEquals(1L, terms.getTerm("leader"));
terms.setTermToZero("leader");
assertEquals(0L, terms.getTerm("leader"));
terms.close();
}
public void testReplicaCanBecomeLeader() throws InterruptedException {
String collection = "replicaCanBecomeLeader";
ZkShardTerms leaderTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
ZkShardTerms replicaTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
leaderTerms.registerTerm("leader");
replicaTerms.registerTerm("replica");
leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
waitFor(false, () -> replicaTerms.canBecomeLeader("replica"));
waitFor(true, () -> leaderTerms.skipSendingUpdatesTo("replica"));
replicaTerms.startRecovering("replica");
waitFor(false, () -> replicaTerms.canBecomeLeader("replica"));
waitFor(false, () -> leaderTerms.skipSendingUpdatesTo("replica"));
replicaTerms.doneRecovering("replica");
waitFor(true, () -> replicaTerms.canBecomeLeader("replica"));
waitFor(false, () -> leaderTerms.skipSendingUpdatesTo("replica"));
leaderTerms.close();
replicaTerms.close();
}
private <T> void waitFor(T expected, Supplier<T> supplier) throws InterruptedException {
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
while (!timeOut.hasTimedOut()) {