diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 7b4f878e225..e0ef777b630 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -78,6 +78,9 @@ Upgrade Notes * LUCENE-8161: If you are using the spatial JTS library with Solr, you must upgrade to 1.15.0. This new version of JTS is now dual-licensed to include a BSD style license. +* SOLR-12011: Replicas which are not up-to-date are not allowed to become leader. Use FORCELEADER API to + allow these replicas become leader. + New Features ---------------------- * SOLR-11285: Simulation framework for autoscaling. (ab) @@ -236,6 +239,8 @@ Bug Fixes Also changed the display label in the Admin UI from routerField to router.field to match the actual API. (Shawn Heisey via Cassandra Targett) +* SOLR-12011: Consistence problem when in-sync replicas are DOWN. (Cao Manh Dat) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java index 32cb65bde48..068191eb5fe 100644 --- a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java +++ b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java @@ -42,7 +42,7 @@ public class CloudDescriptor { // set to true once a core has registered in zk // set to false on detecting a session expiration private volatile boolean hasRegistered = false; - volatile Replica.State lastPublished = Replica.State.ACTIVE; + private volatile Replica.State lastPublished = Replica.State.ACTIVE; public static final String NUM_SHARDS = "numShards"; diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 2d00151b53a..3ba4dfce939 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -20,9 +20,10 @@ import java.io.Closeable; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -359,12 +360,18 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType(); - + String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(); // should I be leader? - if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) { + if (zkController.getShardTerms(collection, shardId).registered(coreNodeName) + && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) { + log.info("Can't become leader, term of replica {} less than leader", coreNodeName); rejoinLeaderElection(core); return; } + + if (isClosed) { + return; + } log.info("I may be the new leader - try and sync"); @@ -516,8 +523,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { zkStateReader.forceUpdateCollection(collection); ClusterState clusterState = zkStateReader.getClusterState(); Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP)); - if (rep != null && rep.getState() != Replica.State.ACTIVE - && rep.getState() != Replica.State.RECOVERING) { + if (rep != null && rep.getState() != Replica.State.ACTIVE) { log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE"); zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE); } @@ -593,40 +599,43 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } if (replicas != null && replicas.size() > 0) { + // set of replicas which is running in new LIR but lirState=DOWN + Set replicasMustBeInLowerTerm = new HashSet<>(); for (String replicaCoreNodeName : replicas) { if (coreNodeName.equals(replicaCoreNodeName)) continue; // added safe-guard so we don't mark this core as down - if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) { - // the replica registered its term so it is running with the new LIR implementation - // we can put this replica into recovery by increase our terms - zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, Collections.singleton(replicaCoreNodeName)); - continue; - } - final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName); if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) { log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: " + lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName); - List replicaProps = - zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName); + List replicasProps = + zkController.getZkStateReader().getClusterState().getCollection(collection) + .getSlice(shardId).getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); - if (replicaProps != null && replicaProps.size() > 0) { + if (replicasProps != null && replicasProps.size() > 0) { ZkCoreNodeProps coreNodeProps = null; - for (ZkCoreNodeProps p : replicaProps) { - if (((Replica)p.getNodeProps()).getName().equals(replicaCoreNodeName)) { - coreNodeProps = p; + for (Replica p : replicasProps) { + if (p.getName().equals(replicaCoreNodeName)) { + coreNodeProps = new ZkCoreNodeProps(p); break; } } - zkController.ensureReplicaInLeaderInitiatedRecovery(cc, - collection, shardId, coreNodeProps, core.getCoreDescriptor(), - false /* forcePublishState */); + if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) { + replicasMustBeInLowerTerm.add(replicaCoreNodeName); + } else { + zkController.ensureReplicaInLeaderInitiatedRecovery(cc, + collection, shardId, coreNodeProps, core.getCoreDescriptor(), + false /* forcePublishState */); + } } } } + // these replicas registered their terms so it is running with the new LIR implementation + // we can put this replica into recovery by increase our terms + zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, replicasMustBeInLowerTerm); } } // core gets closed automagically } @@ -741,39 +750,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { leaderElector.joinElection(this, true); } - private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) { - log.debug("Checking if I should try and be the leader."); - - if (isClosed) { - log.debug("Bailing on leader process because we have been closed"); - return false; - } - - if (!weAreReplacement) { - // we are the first node starting in the shard - there is a configurable wait - // to make sure others participate in sync and leader election, we can be leader - return true; - } - - String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(); - if (zkController.getShardTerms(collection, shardId).registered(coreNodeName) - && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) { - log.info("Can't become leader, term of replica {} less than leader", coreNodeName); - return false; - } - - if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) { - log.debug("My last published State was Active, it's okay to be the leader."); - return true; - } - log.debug("My last published State was " - + core.getCoreDescriptor().getCloudDescriptor().getLastPublished() - + ", I won't be the leader."); - // TODO: and if no one is a good candidate? - - return false; - } - } final class OverseerElectionContext extends ElectionContext { diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java index 26fec974a1f..90a500aec29 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java @@ -48,7 +48,7 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher { if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true; String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(); - if (terms.canBecomeLeader(coreNodeName)) return true; + if (terms.haveHighestTermValue(coreNodeName)) return true; if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) { log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName); lastTermDoRecovery.set(terms.getTerm(coreNodeName)); diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index cb1fcea5ba8..a159db5c5b2 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1045,7 +1045,7 @@ public class ZkController { // This flag is used for testing rolling updates and should be removed in SOLR-11812 boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new")); - if (isRunningInNewLIR) { + if (isRunningInNewLIR && cloudDesc.getReplicaType() != Type.PULL) { shardTerms.registerTerm(coreZkNodeName); } String shardId = cloudDesc.getShardId(); @@ -1455,13 +1455,20 @@ public class ZkController { // This flag is used for testing rolling updates and should be removed in SOLR-11812 boolean isRunningInNewLIR = "new".equals(cd.getCoreProperty("lirVersion", "new")); - if (state == Replica.State.RECOVERING && isRunningInNewLIR) { - getShardTerms(collection, shardId).setEqualsToMax(coreNodeName); + // pull replicas are excluded because their terms are not considered + if (state == Replica.State.RECOVERING && isRunningInNewLIR && cd.getCloudDescriptor().getReplicaType() != Type.PULL) { + // state is used by client, state of replica can change from RECOVERING to DOWN without needed to finish recovery + // by calling this we will know that a replica actually finished recovery or not + getShardTerms(collection, shardId).startRecovering(coreNodeName); } + if (state == Replica.State.ACTIVE && isRunningInNewLIR && cd.getCloudDescriptor().getReplicaType() != Type.PULL) { + getShardTerms(collection, shardId).doneRecovering(coreNodeName); + } + ZkNodeProps m = new ZkNodeProps(props); if (updateLastState) { - cd.getCloudDescriptor().lastPublished = state; + cd.getCloudDescriptor().setLastPublished(state); } overseerJobQueue.offer(Utils.toJSON(m)); } finally { diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java index 7dc0d575627..50b424c083f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeoutException; @@ -100,6 +99,8 @@ public class ZkShardTerms implements AutoCloseable{ * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term */ public void ensureTermsIsHigher(String leader, Set replicasNeedingRecovery) { + if (replicasNeedingRecovery.isEmpty()) return; + Terms newTerms; while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) { if (forceSaveTerms(newTerms)) return; @@ -107,7 +108,7 @@ public class ZkShardTerms implements AutoCloseable{ } /** - * Can this replica become leader or is this replica's term equals to leader's term? + * Can this replica become leader? * @param coreNodeName of the replica * @return true if this replica can become leader, false if otherwise */ @@ -115,6 +116,15 @@ public class ZkShardTerms implements AutoCloseable{ return terms.canBecomeLeader(coreNodeName); } + /** + * Should leader skip sending updates to this replica? + * @param coreNodeName of the replica + * @return true if this replica has term equals to leader's term, false if otherwise + */ + public boolean skipSendingUpdatesTo(String coreNodeName) { + return !terms.haveHighestTermValue(coreNodeName); + } + /** * Did this replica registered its term? This is a sign to check f * @param coreNodeName of the replica @@ -184,16 +194,59 @@ public class ZkShardTerms implements AutoCloseable{ } /** - * Set a replica's term equals to leader's term + * Set a replica's term equals to leader's term. + * This call should only be used by {@link org.apache.solr.common.params.CollectionParams.CollectionAction#FORCELEADER} * @param coreNodeName of the replica */ - public void setEqualsToMax(String coreNodeName) { + public void setTermEqualsToLeader(String coreNodeName) { Terms newTerms; - while ( (newTerms = terms.setEqualsToMax(coreNodeName)) != null) { + while ( (newTerms = terms.setTermEqualsToLeader(coreNodeName)) != null) { if (forceSaveTerms(newTerms)) break; } } + public void setTermToZero(String coreNodeName) { + Terms newTerms; + while ( (newTerms = terms.setTermToZero(coreNodeName)) != null) { + if (forceSaveTerms(newTerms)) break; + } + } + + /** + * Mark {@code coreNodeName} as recovering + */ + public void startRecovering(String coreNodeName) { + Terms newTerms; + while ( (newTerms = terms.startRecovering(coreNodeName)) != null) { + if (forceSaveTerms(newTerms)) break; + } + } + + /** + * Mark {@code coreNodeName} as finished recovering + */ + public void doneRecovering(String coreNodeName) { + Terms newTerms; + while ( (newTerms = terms.doneRecovering(coreNodeName)) != null) { + if (forceSaveTerms(newTerms)) break; + } + } + + /** + * When first updates come in, all replicas have some data now, + * so we must switch from term 0 (registered) to 1 (have some data) + */ + public void ensureHighestTermsAreNotZero() { + Terms newTerms; + while ( (newTerms = terms.ensureHighestTermsAreNotZero()) != null) { + if (forceSaveTerms(newTerms)) break; + } + } + + public long getHighestTerm() { + return terms.getMaxTerm(); + } + public long getTerm(String coreNodeName) { Long term = terms.getTerm(coreNodeName); return term == null? -1 : term; @@ -232,6 +285,7 @@ public class ZkShardTerms implements AutoCloseable{ try { Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true); setNewTerms(new Terms(newTerms.values, stat.getVersion())); + log.info("Successful update terms at {} to {}", znodePath, newTerms); return true; } catch (KeeperException.BadVersionException e) { log.info("Failed to save terms, version is not match, retrying"); @@ -367,6 +421,7 @@ public class ZkShardTerms implements AutoCloseable{ */ static class Terms { private final Map values; + private final long maxTerm; // ZK node version private final int version; @@ -377,14 +432,25 @@ public class ZkShardTerms implements AutoCloseable{ public Terms(Map values, int version) { this.values = values; this.version = version; + if (values.isEmpty()) this.maxTerm = 0; + else this.maxTerm = Collections.max(values.values()); } /** - * Can this replica become leader or is this replica's term equals to leader's term? + * Can {@code coreNodeName} become leader? * @param coreNodeName of the replica - * @return true if this replica can become leader, false if otherwise + * @return true if {@code coreNodeName} can become leader, false if otherwise */ boolean canBecomeLeader(String coreNodeName) { + return haveHighestTermValue(coreNodeName) && !values.containsKey(coreNodeName + "_recovering"); + } + + /** + * Is {@code coreNodeName}'s term highest? + * @param coreNodeName of the replica + * @return true if term of {@code coreNodeName} is highest + */ + boolean haveHighestTermValue(String coreNodeName) { if (values.isEmpty()) return true; long maxTerm = Collections.max(values.values()); return values.getOrDefault(coreNodeName, 0L) == maxTerm; @@ -427,6 +493,21 @@ public class ZkShardTerms implements AutoCloseable{ return new Terms(newValues, version); } + /** + * Return a new {@link Terms} in which highest terms are not zero + * @return null if highest terms are already larger than zero + */ + Terms ensureHighestTermsAreNotZero() { + if (maxTerm > 0) return null; + else { + HashMap newValues = new HashMap<>(values); + for (String replica : values.keySet()) { + newValues.put(replica, 1L); + } + return new Terms(newValues, version); + } + } + /** * Return a new {@link Terms} in which term of {@code coreNodeName} is removed * @param coreNodeName of the replica @@ -453,23 +534,70 @@ public class ZkShardTerms implements AutoCloseable{ return new Terms(newValues, version); } + Terms setTermToZero(String coreNodeName) { + if (values.getOrDefault(coreNodeName, -1L) == 0) { + return null; + } + HashMap newValues = new HashMap<>(values); + newValues.put(coreNodeName, 0L); + return new Terms(newValues, version); + } + /** * Return a new {@link Terms} in which the term of {@code coreNodeName} is max * @param coreNodeName of the replica * @return null if term of {@code coreNodeName} is already maximum */ - Terms setEqualsToMax(String coreNodeName) { - long maxTerm; - try { - maxTerm = Collections.max(values.values()); - } catch (NoSuchElementException e){ - maxTerm = 0; - } + Terms setTermEqualsToLeader(String coreNodeName) { + long maxTerm = getMaxTerm(); if (values.get(coreNodeName) == maxTerm) return null; HashMap newValues = new HashMap<>(values); newValues.put(coreNodeName, maxTerm); return new Terms(newValues, version); } + + long getMaxTerm() { + return maxTerm; + } + + /** + * Mark {@code coreNodeName} as recovering + * @param coreNodeName of the replica + * @return null if {@code coreNodeName} is already marked as doing recovering + */ + Terms startRecovering(String coreNodeName) { + long maxTerm = getMaxTerm(); + if (values.get(coreNodeName) == maxTerm && values.getOrDefault(coreNodeName+"_recovering", -1L) == maxTerm) + return null; + + HashMap newValues = new HashMap<>(values); + newValues.put(coreNodeName, maxTerm); + newValues.put(coreNodeName+"_recovering", maxTerm); + return new Terms(newValues, version); + } + + /** + * Mark {@code coreNodeName} as finished recovering + * @param coreNodeName of the replica + * @return null if term of {@code coreNodeName} is already finished doing recovering + */ + Terms doneRecovering(String coreNodeName) { + if (!values.containsKey(coreNodeName+"_recovering")) { + return null; + } + + HashMap newValues = new HashMap<>(values); + newValues.remove(coreNodeName+"_recovering"); + return new Terms(newValues, version); + } + + @Override + public String toString() { + return "Terms{" + + "values=" + values + + ", version=" + version + + '}'; + } } } diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index bf25d69077b..c73507154de 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -1132,6 +1132,8 @@ public class CoreContainer { if (leader != null && leader.getState() == State.ACTIVE) { log.info("Found active leader, will attempt to create fresh core and recover."); resetIndexDirectory(dcore, coreConfig); + // the index of this core is emptied, its term should be set to 0 + getZkController().getShardTerms(desc.getCollectionName(), desc.getShardId()).setTermToZero(desc.getCoreNodeName()); return new SolrCore(this, dcore, coreConfig); } } catch (SolrException se) { diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 4933559cc1a..db707964db2 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -1149,7 +1149,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission if (optionalMaxTerm.isPresent()) { liveReplicas.stream() .filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong()) - .forEach(rep -> zkShardTerms.setEqualsToMax(rep.getName())); + .forEach(rep -> zkShardTerms.setTermEqualsToLeader(rep.getName())); } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java index fbf24a119e0..179589b5790 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java @@ -28,7 +28,6 @@ import org.apache.lucene.store.Directory; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; -import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; @@ -234,16 +233,6 @@ enum CoreAdminOperation implements CoreAdminOp { if (cname == null) { throw new IllegalArgumentException(CoreAdminParams.CORE + " is required"); } - try (SolrCore core = it.handler.coreContainer.getCore(cname)) { - - // Setting the last published state for this core to be ACTIVE - if (core != null) { - core.getCoreDescriptor().getCloudDescriptor().setLastPublished(Replica.State.ACTIVE); - log().info("Setting the last published state for this core, {}, to {}", core.getName(), Replica.State.ACTIVE); - } else { - SolrException.log(log(), "Could not find core: " + cname); - } - } }), BACKUPCORE_OP(BACKUPCORE, new BackupCoreOp()), diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java index 36477359d0f..d064e78526a 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java @@ -127,7 +127,10 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp { ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName()); // if the replica is waiting for leader to see recovery state, the leader should refresh its terms - if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && !shardTerms.canBecomeLeader(coreNodeName)) { + if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && shardTerms.skipSendingUpdatesTo(coreNodeName)) { + // The replica changed it term, then published itself as RECOVERING. + // This core already see replica as RECOVERING + // so it is guarantees that a live-fetch will be enough for this core to see max term published shardTerms.refreshTerms(); } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java b/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java index 03d1478caac..dbb2af0537e 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java @@ -20,8 +20,10 @@ package org.apache.solr.handler.admin; import java.net.URI; import java.util.Optional; +import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.core.SolrCore; @@ -61,11 +63,22 @@ class RestoreCoreOp implements CoreAdminHandler.CoreAdminOp { URI locationUri = repository.createURI(location); try (SolrCore core = it.handler.coreContainer.getCore(cname)) { + CloudDescriptor cd = core.getCoreDescriptor().getCloudDescriptor(); + // this core must be the only replica in its shard otherwise + // we cannot guarantee consistency between replicas because when we add data (or restore index) to this replica + Slice slice = zkController.getClusterState().getCollection(cd.getCollectionName()).getSlice(cd.getShardId()); + if (slice.getReplicas().size() != 1) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Failed to restore core=" + core.getName() + ", the core must be the only replica in its shard"); + } RestoreCore restoreCore = new RestoreCore(repository, core, locationUri, name); boolean success = restoreCore.doRestore(); if (!success) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to restore core=" + core.getName()); } + // other replicas to-be-created will know that they are out of date by + // looking at their term : 0 compare to term of this core : 1 + zkController.getShardTerms(cd.getCollectionName(), cd.getShardId()).ensureHighestTermsAreNotZero(); } } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java index 5267c75a8c0..5e924d8b82e 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java @@ -23,6 +23,8 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.cloud.ZkShardTerms; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; @@ -111,6 +113,16 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp { SolrCore newcore = it.handler.coreContainer.getCore(newCoreName); if (newcore != null) { newCores.add(newcore); + if (it.handler.coreContainer.isZooKeeperAware()) { + // this core must be the only replica in its shard otherwise + // we cannot guarantee consistency between replicas because when we add data to this replica + CloudDescriptor cd = newcore.getCoreDescriptor().getCloudDescriptor(); + ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState(); + if (clusterState.getCollection(cd.getCollectionName()).getSlice(cd.getShardId()).getReplicas().size() != 1) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Core with core name " + newCoreName + " must be the only replica in shard " + cd.getShardId()); + } + } } else { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Core with core name " + newCoreName + " expected but doesn't exist."); } @@ -123,6 +135,15 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp { SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges, router, routeFieldName, splitKey); core.getUpdateHandler().split(cmd); + if (it.handler.coreContainer.isZooKeeperAware()) { + for (SolrCore newcore : newCores) { + // the index of the core changed from empty to have some data, its term must be not zero + CloudDescriptor cd = newcore.getCoreDescriptor().getCloudDescriptor(); + ZkShardTerms zkShardTerms = it.handler.coreContainer.getZkController().getShardTerms(cd.getCollectionName(), cd.getShardId()); + zkShardTerms.ensureHighestTermsAreNotZero(); + } + } + // After the split has completed, someone (here?) should start the process of replaying the buffered updates. } catch (Exception e) { diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index b37cb9c915f..50705825e3d 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -173,6 +172,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { private boolean forwardToLeader = false; private boolean isSubShardLeader = false; private List nodes; + private Set skippedCoreNodeNames; + private boolean isIndexChanged = false; private UpdateCommand updateCommand; // the current command this processor is working on. @@ -334,9 +335,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // that means I want to forward onto my replicas... // so get the replicas... forwardToLeader = false; - List replicaProps = zkController.getZkStateReader() - .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN); - if (replicaProps == null) { + ClusterState clusterState = zkController.getZkStateReader().getClusterState(); + String leaderCoreNodeName = leaderReplica.getName(); + List replicas = clusterState.getCollection(collection) + .getSlice(shardId) + .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); + replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName)); + if (replicas.isEmpty()) { return null; } @@ -349,16 +354,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { log.info("test.distrib.skip.servers was found and contains:" + skipListSet); } - List nodes = new ArrayList<>(replicaProps.size()); + List nodes = new ArrayList<>(replicas.size()); + skippedCoreNodeNames = new HashSet<>(); ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId); - for (ZkCoreNodeProps props : replicaProps) { - String coreNodeName = ((Replica) props.getNodeProps()).getName(); - if (skipList != null && skipListSet.contains(props.getCoreUrl())) { - log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:true"); - } else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) { - log.info("skip url:{} cause its term is less than leader", props.getCoreUrl()); + for (Replica replica: replicas) { + String coreNodeName = replica.getName(); + if (skipList != null && skipListSet.contains(replica.getCoreUrl())) { + log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true"); + } else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) { + log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl()); + skippedCoreNodeNames.add(replica.getName()); + } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) { + skippedCoreNodeNames.add(replica.getName()); } else { - nodes.add(new StdNode(props, collection, shardId)); + nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId)); } } return nodes; @@ -750,6 +759,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // TODO: optionally fail if n replicas are not reached... private void doFinish() { + boolean shouldUpdateTerms = isLeader && !isOldLIRMode && isIndexChanged; + if (shouldUpdateTerms) { + ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId()); + if (skippedCoreNodeNames != null) { + zkShardTerms.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), skippedCoreNodeNames); + } + zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero(); + } // TODO: if not a forward and replication req is not specified, we could // send in a background thread @@ -758,7 +775,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // TODO - we may need to tell about more than one error... List errorsForClient = new ArrayList<>(errors.size()); - Map> failedReplicas = new HashMap<>(); + Set replicasShouldBeInLowerTerms = new HashSet<>(); for (final SolrCmdDistributor.Error error : errors) { if (error.req.node instanceof RetryNode) { @@ -856,9 +873,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { Throwable rootCause = SolrException.getRootCause(error.e); if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) { log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause); - ShardInfo shardInfo = new ShardInfo(collection, shardId, leaderCoreNodeName); - failedReplicas.putIfAbsent(shardInfo, new HashSet<>()); - failedReplicas.get(shardInfo).add(coreNodeName); + replicasShouldBeInLowerTerms.add(coreNodeName); } else { // The replica did not registered its term, so it must run with old LIR implementation log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause); @@ -891,11 +906,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } } - if (!isOldLIRMode) { - for (Map.Entry> entry : failedReplicas.entrySet()) { - ShardInfo shardInfo = entry.getKey(); - zkController.getShardTerms(shardInfo.collection, shardInfo.shard).ensureTermsIsHigher(shardInfo.leader, entry.getValue()); - } + if (!isOldLIRMode && !replicasShouldBeInLowerTerms.isEmpty()) { + zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId()) + .ensureTermsIsHigher(cloudDesc.getCoreNodeName(), replicasShouldBeInLowerTerms); } // in either case, we need to attach the achieved and min rf to the response. if (leaderReplicationTracker != null || rollupReplicationTracker != null) { @@ -928,48 +941,17 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { throw new DistributedUpdatesAsyncException(errorsForClient); } } - - private class ShardInfo { - private String collection; - private String shard; - private String leader; - - public ShardInfo(String collection, String shard, String leader) { - this.collection = collection; - this.shard = shard; - this.leader = leader; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ShardInfo shardInfo = (ShardInfo) o; - - if (!collection.equals(shardInfo.collection)) return false; - if (!shard.equals(shardInfo.shard)) return false; - return leader.equals(shardInfo.leader); - } - - @Override - public int hashCode() { - int result = collection.hashCode(); - result = 31 * result + shard.hashCode(); - result = 31 * result + leader.hashCode(); - return result; - } - } - // must be synchronized by bucket private void doLocalAdd(AddUpdateCommand cmd) throws IOException { super.processAdd(cmd); + isIndexChanged = true; } // must be synchronized by bucket private void doLocalDelete(DeleteUpdateCommand cmd) throws IOException { super.processDelete(cmd); + isIndexChanged = true; } /** diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java new file mode 100644 index 00000000000..ee8bf51ac5e --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.solr.JSONTestUtil; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.util.TimeOut; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestCloudConsistency extends SolrCloudTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static Map proxies; + private static Map jettys; + + @BeforeClass + public static void setupCluster() throws Exception { + System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory"); + System.setProperty("solr.ulog.numRecordsToKeep", "1000"); + + configureCluster(4) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + // Add proxies + proxies = new HashMap<>(cluster.getJettySolrRunners().size()); + jettys = new HashMap<>(); + for (JettySolrRunner jetty:cluster.getJettySolrRunners()) { + SocketProxy proxy = new SocketProxy(); + jetty.setProxyPort(proxy.getListenPort()); + cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart + cluster.startJettySolrRunner(jetty); + proxy.open(jetty.getBaseUrl().toURI()); + LOG.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl()); + proxies.put(jetty, proxy); + jettys.put(proxy.getUrl(), jetty); + } + } + + @AfterClass + public static void tearDownCluster() throws Exception { + for (SocketProxy proxy:proxies.values()) { + proxy.close(); + } + proxies = null; + jettys = null; + } + + @Test + public void testOutOfSyncReplicasCannotBecomeLeader() throws Exception { + testOutOfSyncReplicasCannotBecomeLeader(false); + } + + @Test + public void testOutOfSyncReplicasCannotBecomeLeaderAfterRestart() throws Exception { + testOutOfSyncReplicasCannotBecomeLeader(true); + } + + public void testOutOfSyncReplicasCannotBecomeLeader(boolean onRestart) throws Exception { + final String collectionName = "outOfSyncReplicasCannotBecomeLeader-"+onRestart; + CollectionAdminRequest.createCollection(collectionName, 1, 3) + .setCreateNodeSet("") + .process(cluster.getSolrClient()); + CollectionAdminRequest.addReplicaToShard(collectionName, "shard1") + .setNode(cluster.getJettySolrRunner(0).getNodeName()) + .process(cluster.getSolrClient()); + waitForState("Timeout waiting for shard leader", collectionName, clusterShape(1, 1)); + + CollectionAdminRequest.addReplicaToShard(collectionName, "shard1") + .setNode(cluster.getJettySolrRunner(1).getNodeName()) + .process(cluster.getSolrClient()); + CollectionAdminRequest.addReplicaToShard(collectionName, "shard1") + .setNode(cluster.getJettySolrRunner(2).getNodeName()) + .process(cluster.getSolrClient()); + waitForState("Timeout waiting for 1x3 collection", collectionName, clusterShape(1, 3)); + + addDocs(collectionName, 3, 1); + + final Replica oldLeader = getCollectionState(collectionName).getSlice("shard1").getLeader(); + assertEquals(cluster.getJettySolrRunner(0).getNodeName(), oldLeader.getNodeName()); + + if (onRestart) { + addDocToWhenOtherReplicasAreDown(collectionName, oldLeader, 4); + } else { + addDocWhenOtherReplicasAreNetworkPartitioned(collectionName, oldLeader, 4); + } + + assertDocsExistInAllReplicas(getCollectionState(collectionName).getReplicas(), collectionName, 1, 4); + + CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient()); + } + + + /** + * Adding doc when replicas (not leader) are down, + * These replicas are out-of-sync hence they should not become leader even when current leader is DOWN. + * Leader should be on node - 0 + */ + private void addDocToWhenOtherReplicasAreDown(String collection, Replica leader, int docId) throws Exception { + ChaosMonkey.stop(cluster.getJettySolrRunner(1)); + ChaosMonkey.stop(cluster.getJettySolrRunner(2)); + waitForState("", collection, (liveNodes, collectionState) -> + collectionState.getSlice("shard1").getReplicas().stream() + .filter(replica -> replica.getState() == Replica.State.DOWN).count() == 2); + + addDocs(collection, 1, docId); + ChaosMonkey.stop(cluster.getJettySolrRunner(0)); + waitForState("", collection, (liveNodes, collectionState) -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN); + + ChaosMonkey.start(cluster.getJettySolrRunner(1)); + ChaosMonkey.start(cluster.getJettySolrRunner(2)); + TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.CURRENT_TIME); + while (!timeOut.hasTimedOut()) { + Replica newLeader = getCollectionState(collection).getSlice("shard1").getLeader(); + if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) { + fail("Out of sync replica became leader " + newLeader); + } + } + + ChaosMonkey.start(cluster.getJettySolrRunner(0)); + waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> { + Replica newLeader = collectionState.getLeader("shard1"); + return newLeader != null && newLeader.getName().equals(leader.getName()); + }); + waitForState("Timeout waiting for active collection", collection, clusterShape(1, 3)); + } + + + /** + * Adding doc when replicas (not leader) are network partitioned with leader, + * These replicas are out-of-sync hence they should not become leader even when current leader is DOWN. + * Leader should be on node - 0 + */ + private void addDocWhenOtherReplicasAreNetworkPartitioned(String collection, Replica leader, int docId) throws Exception { + for (int i = 0; i < 3; i++) { + proxies.get(cluster.getJettySolrRunner(i)).close(); + } + addDoc(collection, docId, cluster.getJettySolrRunner(0)); + ChaosMonkey.stop(cluster.getJettySolrRunner(0)); + for (int i = 1; i < 3; i++) { + proxies.get(cluster.getJettySolrRunner(i)).reopen(); + } + waitForState("Timeout waiting for leader goes DOWN", collection, (liveNodes, collectionState) + -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN); + + TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.CURRENT_TIME); + while (!timeOut.hasTimedOut()) { + Replica newLeader = getCollectionState(collection).getLeader("shard1"); + if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) { + fail("Out of sync replica became leader " + newLeader); + } + } + + proxies.get(cluster.getJettySolrRunner(0)).reopen(); + ChaosMonkey.start(cluster.getJettySolrRunner(0)); + waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> { + Replica newLeader = collectionState.getLeader("shard1"); + return newLeader != null && newLeader.getName().equals(leader.getName()); + }); + waitForState("Timeout waiting for active collection", collection, clusterShape(1, 3)); + } + + private void addDocs(String collection, int numDocs, int startId) throws SolrServerException, IOException { + List docs = new ArrayList<>(numDocs); + for (int i = 0; i < numDocs; i++) { + int id = startId + i; + docs.add(new SolrInputDocument("id", String.valueOf(id), "fieldName_s", String.valueOf(id))); + } + cluster.getSolrClient().add(collection, docs); + cluster.getSolrClient().commit(collection); + } + + private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException { + try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) { + solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId), "fieldName_s", String.valueOf(docId))); + } + } + + private void assertDocsExistInAllReplicas(List notLeaders, + String testCollectionName, int firstDocId, int lastDocId) throws Exception { + Replica leader = + cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000); + HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName); + List replicas = + new ArrayList(notLeaders.size()); + + for (Replica r : notLeaders) { + replicas.add(getHttpSolrClient(r, testCollectionName)); + } + try { + for (int d = firstDocId; d <= lastDocId; d++) { + String docId = String.valueOf(d); + assertDocExists(leaderSolr, testCollectionName, docId); + for (HttpSolrClient replicaSolr : replicas) { + assertDocExists(replicaSolr, testCollectionName, docId); + } + } + } finally { + if (leaderSolr != null) { + leaderSolr.close(); + } + for (HttpSolrClient replicaSolr : replicas) { + replicaSolr.close(); + } + } + } + + private void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception { + NamedList rsp = realTimeGetDocId(solr, docId); + String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId); + assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL() + + " due to: " + match + "; rsp="+rsp, match == null); + } + + private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException { + QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false")); + return solr.request(qr); + } + + protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception { + ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica); + String url = zkProps.getBaseUrl() + "/" + coll; + return getHttpSolrClient(url); + } + + + protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception { + String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); + assertNotNull(replicaBaseUrl); + URL baseUrl = new URL(replicaBaseUrl); + + JettySolrRunner proxy = jettys.get(baseUrl.toURI()); + assertNotNull("No proxy found for " + baseUrl + "!", proxy); + return proxy; + } + +} diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java index 146f5c0ccd5..c10ec0f0241 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java @@ -425,10 +425,19 @@ public class TestPullReplica extends SolrCloudTestCase { Replica pullReplica = docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PULL)).get(0); assertTrue(pullReplica.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())); + long highestTerm = 0L; + try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) { + highestTerm = zkShardTerms.getHighestTerm(); + } // add document, this should fail since there is no leader. Pull replica should not accept the update expectThrows(SolrException.class, () -> cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo")) ); + if (removeReplica) { + try(ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) { + assertEquals(highestTerm, zkShardTerms.getHighestTerm()); + } + } // Also fails if I send the update to the pull replica explicitly try (HttpSolrClient pullReplicaClient = getHttpSolrClient(docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) { @@ -436,6 +445,11 @@ public class TestPullReplica extends SolrCloudTestCase { cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo")) ); } + if (removeReplica) { + try(ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) { + assertEquals(highestTerm, zkShardTerms.getHighestTerm()); + } + } // Queries should still work waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL))); diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java index 037517b1411..d557b2915ee 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java @@ -94,7 +94,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase { assertEquals(1L, rep1Terms.getTerm("rep1")); waitFor(1L, () -> rep2Terms.getTerm("rep1")); - rep2Terms.setEqualsToMax("rep2"); + rep2Terms.setTermEqualsToLeader("rep2"); assertEquals(1L, rep2Terms.getTerm("rep2")); rep2Terms.registerTerm("rep2"); assertEquals(1L, rep2Terms.getTerm("rep2")); @@ -138,7 +138,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase { while (!stop.get()) { try { Thread.sleep(random().nextInt(200)); - zkShardTerms.setEqualsToMax(replica); + zkShardTerms.setTermEqualsToLeader(replica); } catch (InterruptedException e) { e.printStackTrace(); } @@ -178,7 +178,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase { waitFor(1, count::get); leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica")); waitFor(2, count::get); - replicaTerms.setEqualsToMax("replica"); + replicaTerms.setTermEqualsToLeader("replica"); waitFor(3, count::get); assertEquals(0, replicaTerms.getNumListeners()); @@ -194,6 +194,41 @@ public class ZkShardTermsTest extends SolrCloudTestCase { assertEquals(1L, terms.getTerm("leader").longValue()); } + public void testSetTermToZero() { + String collection = "setTermToZero"; + ZkShardTerms terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient()); + terms.registerTerm("leader"); + terms.registerTerm("replica"); + terms.ensureTermsIsHigher("leader", Collections.singleton("replica")); + assertEquals(1L, terms.getTerm("leader")); + terms.setTermToZero("leader"); + assertEquals(0L, terms.getTerm("leader")); + terms.close(); + } + + public void testReplicaCanBecomeLeader() throws InterruptedException { + String collection = "replicaCanBecomeLeader"; + ZkShardTerms leaderTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient()); + ZkShardTerms replicaTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient()); + leaderTerms.registerTerm("leader"); + replicaTerms.registerTerm("replica"); + + leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica")); + waitFor(false, () -> replicaTerms.canBecomeLeader("replica")); + waitFor(true, () -> leaderTerms.skipSendingUpdatesTo("replica")); + + replicaTerms.startRecovering("replica"); + waitFor(false, () -> replicaTerms.canBecomeLeader("replica")); + waitFor(false, () -> leaderTerms.skipSendingUpdatesTo("replica")); + + replicaTerms.doneRecovering("replica"); + waitFor(true, () -> replicaTerms.canBecomeLeader("replica")); + waitFor(false, () -> leaderTerms.skipSendingUpdatesTo("replica")); + + leaderTerms.close(); + replicaTerms.close(); + } + private void waitFor(T expected, Supplier supplier) throws InterruptedException { TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource()); while (!timeOut.hasTimedOut()) {