mirror of https://github.com/apache/lucene.git
SOLR-12767: Always include the achieved rf in the response
This commit deprecates the min_rf parameter. Solr now always includes the achieved replication factor in the update requests (as if min_rf was always specified). Also, reverts the changes introduced in SOLR-8034, replicas that don't ack an update will have to recover to prevent inconsistent shards.
This commit is contained in:
parent
751bf7db20
commit
46f753d7c6
|
@ -101,6 +101,11 @@ Velocity 1.7 and Velocity Tools 2.0
|
|||
Apache ZooKeeper 3.4.11
|
||||
Jetty 9.4.11.v20180605
|
||||
|
||||
Upgrade Notes
|
||||
----------------------
|
||||
* SOLR-12767: The min_rf parameter is no longer needed, Solr will always return the achieved replication factor (rf)
|
||||
in the response header.
|
||||
|
||||
New Features
|
||||
----------------------
|
||||
|
||||
|
@ -140,6 +145,15 @@ Bug Fixes
|
|||
|
||||
* SOLR-12750: Migrate API should lock the collection instead of shard. (shalin)
|
||||
|
||||
* SOLR-12767: When using min_rf, shard leader skipped bad replicas instead of marking them for recovery, this could
|
||||
create inconsistencies between replicas of the same shard. min_rf parameter is now deprecated, and even if provided
|
||||
replicas that don't ack an update from the leader will be marked for recovery. (Tomás Fernández Löbbe)
|
||||
|
||||
Improvements
|
||||
----------------------
|
||||
* SOLR-12767: Solr now always includes in the response of update requests the achieved replication factor
|
||||
(Tomás Fernández Löbbe)
|
||||
|
||||
================== 7.5.0 ==================
|
||||
|
||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
||||
|
|
|
@ -386,10 +386,10 @@ public class SolrCmdDistributor implements Closeable {
|
|||
// we need to add the data to the rollup tracker.
|
||||
//
|
||||
// In the case of a leaderTracker and rollupTracker both being present, then we need to take care when assembling
|
||||
// the final response to check both the rollup and leader trackers on the aggrator node.
|
||||
// the final response to check both the rollup and leader trackers on the aggregator node.
|
||||
public void trackRequestResult(HttpResponse resp, boolean success) {
|
||||
|
||||
// Returing Integer.MAX_VALUE here means there was no "rf" on the response, therefore we just need to increment
|
||||
|
||||
// Returning Integer.MAX_VALUE here means there was no "rf" on the response, therefore we just need to increment
|
||||
// our achieved rf if we are a leader, i.e. have a leaderTracker.
|
||||
int rfFromResp = getRfFromResponse(resp);
|
||||
|
||||
|
@ -420,7 +420,7 @@ public class SolrCmdDistributor implements Closeable {
|
|||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to parse response from " + node + " during replication factor accounting due to: " + e);
|
||||
log.warn("Failed to parse response from {} during replication factor accounting", node, e);
|
||||
} finally {
|
||||
if (inputStream != null) {
|
||||
try {
|
||||
|
|
|
@ -692,6 +692,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
zkController.getBaseUrl(), req.getCore().getName()));
|
||||
|
||||
if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
|
||||
// TODO: Kept for rolling upgrades only. Should be removed in Solr 9
|
||||
params.set(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
|
||||
}
|
||||
|
||||
|
@ -729,43 +730,32 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
}
|
||||
|
||||
// helper method, processAdd was getting a bit large.
|
||||
// Sets replicationTracker = null if we aren't the leader or don't care abeout minRf
|
||||
// We have three possibilities here:
|
||||
// Sets replicationTracker = null if we aren't the leader
|
||||
// We have two possibilities here:
|
||||
//
|
||||
// 1> there is no min_rf specified: Just return
|
||||
// 2> we are a leader: Allocate a LeaderTracker and, if we're getting the original request, a RollupTracker
|
||||
// 3> we're a follower: allocat a RollupTracker
|
||||
// 1> we are a leader: Allocate a LeaderTracker and, if we're getting the original request, a RollupTracker
|
||||
// 2> we're a follower: allocat a RollupTracker
|
||||
//
|
||||
private void checkReplicationTracker(UpdateCommand cmd) {
|
||||
String repFact = req.getParams().get(UpdateRequest.MIN_REPFACT);
|
||||
|
||||
if (zkEnabled == false || repFact == null) {
|
||||
if (zkEnabled == false) {
|
||||
rollupReplicationTracker = null; // never need one of these in stand-alone
|
||||
leaderReplicationTracker = null;
|
||||
return;
|
||||
}
|
||||
|
||||
int requestedReplicationFactor;
|
||||
|
||||
try {
|
||||
requestedReplicationFactor = Integer.parseInt(repFact);
|
||||
} catch (NumberFormatException nfe) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "MinRF must be an integer, was " + repFact);
|
||||
}
|
||||
|
||||
SolrParams rp = cmd.getReq().getParams();
|
||||
String distribUpdate = rp.get(DISTRIB_UPDATE_PARAM);
|
||||
// Ok,we're receiving the original request, we need a rollup tracker, but only one so we accumulate over the
|
||||
// course of a batch.
|
||||
if ((distribUpdate == null || DistribPhase.NONE.toString().equals(distribUpdate)) &&
|
||||
rollupReplicationTracker == null) {
|
||||
rollupReplicationTracker = new RollupRequestReplicationTracker(repFact);
|
||||
rollupReplicationTracker = new RollupRequestReplicationTracker();
|
||||
}
|
||||
// If we're a leader, we need a leader replication tracker, so let's do that. If there are multiple docs in
|
||||
// a batch we need to use the _same_ leader replication tracker.
|
||||
if (isLeader && leaderReplicationTracker == null) {
|
||||
leaderReplicationTracker = new LeaderRequestReplicationTracker(
|
||||
req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId(), requestedReplicationFactor);
|
||||
req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -877,12 +867,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
// If the client specified minRf and we didn't achieve the minRf, don't send recovery and let client retry
|
||||
if (leaderReplicationTracker != null &&
|
||||
leaderReplicationTracker.getAchievedRf() < leaderReplicationTracker.getRequestedRf()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (leaderCoreNodeName != null && cloudDesc.getCoreNodeName().equals(leaderCoreNodeName) // we are still same leader
|
||||
&& foundErrorNodeInReplicaList // we found an error for one of replicas
|
||||
&& !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR
|
||||
|
@ -933,15 +917,23 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId())
|
||||
.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), replicasShouldBeInLowerTerms);
|
||||
}
|
||||
// in either case, we need to attach the achieved and min rf to the response.
|
||||
handleReplicationFactor();
|
||||
if (0 < errorsForClient.size()) {
|
||||
throw new DistributedUpdatesAsyncException(errorsForClient);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If necessary, include in the response the achieved replication factor
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
private void handleReplicationFactor() {
|
||||
if (leaderReplicationTracker != null || rollupReplicationTracker != null) {
|
||||
int achievedRf = Integer.MAX_VALUE;
|
||||
int requestedRf = Integer.MAX_VALUE;
|
||||
|
||||
if (leaderReplicationTracker != null) {
|
||||
|
||||
achievedRf = leaderReplicationTracker.getAchievedRf();
|
||||
requestedRf = leaderReplicationTracker.getRequestedRf();
|
||||
|
||||
// Transfer this to the rollup tracker if it exists
|
||||
if (rollupReplicationTracker != null) {
|
||||
|
@ -952,19 +944,18 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
// Rollup tracker has accumulated stats.
|
||||
if (rollupReplicationTracker != null) {
|
||||
achievedRf = rollupReplicationTracker.getAchievedRf();
|
||||
requestedRf = rollupReplicationTracker.getRequestedRf();
|
||||
}
|
||||
rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, requestedRf);
|
||||
if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
|
||||
// Unused, but kept for back compatibility. To be removed in Solr 9
|
||||
rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, Integer.parseInt(req.getParams().get(UpdateRequest.MIN_REPFACT)));
|
||||
}
|
||||
rsp.getResponseHeader().add(UpdateRequest.REPFACT, achievedRf);
|
||||
rollupReplicationTracker = null;
|
||||
leaderReplicationTracker = null;
|
||||
|
||||
}
|
||||
if (0 < errorsForClient.size()) {
|
||||
throw new DistributedUpdatesAsyncException(errorsForClient);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// must be synchronized by bucket
|
||||
private void doLocalAdd(AddUpdateCommand cmd) throws IOException {
|
||||
super.processAdd(cmd);
|
||||
|
@ -1455,6 +1446,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
zkController.getBaseUrl(), req.getCore().getName()));
|
||||
|
||||
if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
|
||||
// TODO: Kept for rolling upgrades only. Remove in Solr 9
|
||||
params.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
|
||||
}
|
||||
cmdDistrib.distribDelete(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker);
|
||||
|
@ -1520,8 +1512,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
? zkController.getClusterState().getCollection(collection) : null;
|
||||
|
||||
if (zkEnabled && DistribPhase.NONE == phase) {
|
||||
if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null && rollupReplicationTracker == null) {
|
||||
rollupReplicationTracker = new RollupRequestReplicationTracker(req.getParams().get(UpdateRequest.MIN_REPFACT));
|
||||
if (rollupReplicationTracker == null) {
|
||||
rollupReplicationTracker = new RollupRequestReplicationTracker();
|
||||
}
|
||||
boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
|
||||
|
||||
|
@ -1564,10 +1556,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
outParams.remove("commit"); // this will be distributed from the local commit
|
||||
|
||||
|
||||
if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
|
||||
// If we've determined that there are no docs on this shard that need to be deleted, then we don't send
|
||||
// sub-requests to any other replicas for this shard. In this case, min_rf is meaningless for this shard
|
||||
// so flag that in replicationTracker
|
||||
if (params.get(UpdateRequest.MIN_REPFACT) != null) {
|
||||
// TODO: Kept this for rolling upgrades. Remove in Solr 9
|
||||
outParams.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
|
||||
}
|
||||
cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);
|
||||
|
@ -1580,10 +1570,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
phase = DistribPhase.TOLEADER;
|
||||
}
|
||||
|
||||
// check if client has requested minimum replication factor information. will set replicationTracker to null if
|
||||
// we aren't the leader or subShardLeader
|
||||
checkReplicationTracker(cmd);
|
||||
|
||||
List<Node> replicas = null;
|
||||
|
||||
if (zkEnabled && DistribPhase.TOLEADER == phase) {
|
||||
|
@ -1593,6 +1579,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
} else if (DistribPhase.FROMLEADER == phase) {
|
||||
isLeader = false;
|
||||
}
|
||||
|
||||
|
||||
// check if client has requested minimum replication factor information. will set replicationTracker to null if
|
||||
// we aren't the leader or subShardLeader
|
||||
checkReplicationTracker(cmd);
|
||||
|
||||
if (vinfo == null) {
|
||||
super.processDelete(cmd);
|
||||
|
@ -2094,7 +2085,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
// lifetime of the batch. The leader for each shard keeps track of it's own achieved replicaiton for its shard
|
||||
// and attaches that to the response to the originating node (i.e. the one with the RollupReplicationTracker).
|
||||
// Followers in general do not need a tracker of any sort with the sole exception of the RollupReplicationTracker
|
||||
// allocated on the original node that recieves the top-level request.
|
||||
// allocated on the original node that receives the top-level request.
|
||||
//
|
||||
// DeleteById is tricky. Since the docs are sent one at a time, there has to be some fancy dancing. In the
|
||||
// deleteById case, here are the rules:
|
||||
|
@ -2117,24 +2108,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
public static class RollupRequestReplicationTracker {
|
||||
|
||||
private int achievedRf = Integer.MAX_VALUE;
|
||||
private final int requestedRf;
|
||||
|
||||
public RollupRequestReplicationTracker(String minRepFact) {
|
||||
try {
|
||||
this.requestedRf = Integer.parseInt(minRepFact);
|
||||
} catch (NumberFormatException nfe) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "MinRF must be an integer, was " + minRepFact);
|
||||
}
|
||||
}
|
||||
|
||||
public int getAchievedRf() {
|
||||
return achievedRf;
|
||||
}
|
||||
|
||||
public int getRequestedRf() {
|
||||
return requestedRf;
|
||||
}
|
||||
|
||||
// We want to report only the minimun _ever_ achieved...
|
||||
public void testAndSetAchievedRf(int rf) {
|
||||
this.achievedRf = Math.min(this.achievedRf, rf);
|
||||
|
@ -2142,8 +2120,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("RollupRequestReplicationTracker")
|
||||
.append(", requestedRf: ")
|
||||
.append(requestedRf)
|
||||
.append(" achievedRf: ")
|
||||
.append(achievedRf);
|
||||
return sb.toString();
|
||||
|
@ -2164,16 +2140,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
// Since we only allocate one of these on the leader and, by definition, the leader has been found and is running,
|
||||
// we have a replication factor of one by default.
|
||||
private int achievedRf = 1;
|
||||
private final int requestedRf;
|
||||
|
||||
private final String myShardId;
|
||||
|
||||
int getRequestedRf() {
|
||||
return requestedRf;
|
||||
}
|
||||
|
||||
public LeaderRequestReplicationTracker(String shardId, int requestedRf) {
|
||||
this.requestedRf = requestedRf;
|
||||
public LeaderRequestReplicationTracker(String shardId) {
|
||||
this.myShardId = shardId;
|
||||
}
|
||||
|
||||
|
@ -2197,9 +2167,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
sb.append(", achievedRf=")
|
||||
.append(getAchievedRf())
|
||||
.append(" for shard ")
|
||||
.append(myShardId)
|
||||
.append(" requested replication factor: ")
|
||||
.append(requestedRf);
|
||||
.append(myShardId);
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import static org.apache.solr.common.cloud.Replica.State.DOWN;
|
||||
import static org.apache.solr.common.cloud.Replica.State.RECOVERING;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
@ -31,7 +34,6 @@ import java.util.Optional;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.JSONTestUtil;
|
||||
|
@ -41,7 +43,6 @@ import org.apache.solr.client.solrj.SolrServerException;
|
|||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.common.SolrException;
|
||||
|
@ -66,9 +67,6 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.cloud.Replica.State.DOWN;
|
||||
import static org.apache.solr.common.cloud.Replica.State.RECOVERING;
|
||||
|
||||
/**
|
||||
* Simulates HTTP partitions between a leader and replica but the replica does
|
||||
* not lose its ZooKeeper connection.
|
||||
|
@ -135,12 +133,6 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
testDoRecoveryOnRestart();
|
||||
|
||||
// Tests that if we set a minRf that's not satisfied, no recovery is requested, but if minRf is satisfied,
|
||||
// recovery is requested
|
||||
testMinRf();
|
||||
|
||||
waitForThingsToLevelOut(30000);
|
||||
|
||||
// test a 1x2 collection
|
||||
testRf2();
|
||||
|
||||
|
@ -267,95 +259,6 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
attemptCollectionDelete(cloudClient, testCollectionName);
|
||||
}
|
||||
|
||||
protected void testMinRf() throws Exception {
|
||||
// create a collection that has 1 shard and 3 replicas
|
||||
String testCollectionName = "collMinRf_1x3";
|
||||
createCollection(testCollectionName, "conf1", 1, 3, 1);
|
||||
cloudClient.setDefaultCollection(testCollectionName);
|
||||
|
||||
// term of the core still be watched even when the core is reloaded
|
||||
CollectionAdminRequest.reloadCollection(testCollectionName).process(cloudClient);
|
||||
|
||||
sendDoc(1, 2);
|
||||
|
||||
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
|
||||
List<Replica> notLeaders =
|
||||
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
|
||||
assertTrue("Expected 2 non-leader replicas for collection " + testCollectionName
|
||||
+ " but found " + notLeaders.size() + "; clusterState: "
|
||||
+ printClusterStateInfo(testCollectionName),
|
||||
notLeaders.size() == 2);
|
||||
|
||||
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
|
||||
|
||||
// Now introduce a network partition between the leader and 1 replica, so a minRf of 2 is still achieved
|
||||
log.info("partitioning replica : " + notLeaders.get(0));
|
||||
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
|
||||
SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
|
||||
|
||||
proxy0.close();
|
||||
// leader still can connect to replica 2, by closing leaderProxy, replica 1 can not do recovery
|
||||
leaderProxy.close();
|
||||
|
||||
// indexing during a partition
|
||||
int achievedRf = sendDoc(2, 2, leaderJetty);
|
||||
assertEquals("Unexpected achieved replication factor", 2, achievedRf);
|
||||
try (ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, "shard1", cloudClient.getZkStateReader().getZkClient())) {
|
||||
assertFalse(zkShardTerms.canBecomeLeader(notLeaders.get(0).getName()));
|
||||
}
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
proxy0.reopen();
|
||||
leaderProxy.reopen();
|
||||
|
||||
notLeaders =
|
||||
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
|
||||
|
||||
// Since minRf is achieved, we expect recovery, so we expect seeing 2 documents
|
||||
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 2);
|
||||
|
||||
// Now introduce a network partition between the leader and both of its replicas, so a minRf of 2 is NOT achieved
|
||||
proxy0 = getProxyForReplica(notLeaders.get(0));
|
||||
proxy0.close();
|
||||
SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
|
||||
proxy1.close();
|
||||
leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
|
||||
leaderProxy.close();
|
||||
|
||||
achievedRf = sendDoc(3, 2, leaderJetty);
|
||||
assertEquals("Unexpected achieved replication factor", 1, achievedRf);
|
||||
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
|
||||
// Verify that the partitioned replicas are NOT DOWN since minRf wasn't achieved
|
||||
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, 1);
|
||||
|
||||
proxy0.reopen();
|
||||
proxy1.reopen();
|
||||
leaderProxy.reopen();
|
||||
|
||||
notLeaders =
|
||||
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
|
||||
|
||||
// Check that doc 3 is on the leader but not on the notLeaders
|
||||
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
|
||||
try (HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName)) {
|
||||
assertDocExists(leaderSolr, testCollectionName, "3");
|
||||
}
|
||||
|
||||
for (Replica notLeader : notLeaders) {
|
||||
try (HttpSolrClient notLeaderSolr = getHttpSolrClient(notLeader, testCollectionName)) {
|
||||
assertDocNotExists(notLeaderSolr, testCollectionName, "3");
|
||||
}
|
||||
}
|
||||
|
||||
// Retry sending doc 3
|
||||
achievedRf = sendDoc(3, 2);
|
||||
assertEquals("Unexpected achieved replication factor", 3, achievedRf);
|
||||
|
||||
// Now doc 3 should be on all replicas
|
||||
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 3);
|
||||
}
|
||||
|
||||
protected void testRf2() throws Exception {
|
||||
// create a collection that has 1 shard but 2 replicas
|
||||
String testCollectionName = "c8n_1x2";
|
||||
|
|
|
@ -102,7 +102,6 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
|
|||
int maxShardsPerNode = 1;
|
||||
String testCollectionName = "repfacttest_c8n_2x2";
|
||||
String shardId = "shard1";
|
||||
int minRf = 2;
|
||||
|
||||
createCollectionWithRetry(testCollectionName, "conf1", numShards, replicationFactor, maxShardsPerNode);
|
||||
|
||||
|
@ -122,7 +121,7 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
// send directly to the leader using HttpSolrServer instead of CloudSolrServer (to test support for non-direct updates)
|
||||
UpdateRequest up = new UpdateRequest();
|
||||
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
|
||||
maybeAddMinRfExplicitly(2, up);
|
||||
up.add(batch);
|
||||
|
||||
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
|
||||
|
@ -230,13 +229,13 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
|
|||
// Delete the docs by ID indicated
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
req.deleteById(byIdsList);
|
||||
req.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(expectedRfByIds));
|
||||
maybeAddMinRfExplicitly(expectedRfByIds, req);
|
||||
sendNonDirectUpdateRequestReplicaWithRetry(rep, req, expectedRfByIds, coll);
|
||||
|
||||
//Delete the docs by query indicated.
|
||||
req = new UpdateRequest();
|
||||
req.deleteByQuery("id:(" + StringUtils.join(byQueriesSet, " OR ") + ")");
|
||||
req.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(expectedRfDBQ));
|
||||
maybeAddMinRfExplicitly(expectedRfDBQ, req);
|
||||
sendNonDirectUpdateRequestReplicaWithRetry(rep, req, expectedRfDBQ, coll);
|
||||
|
||||
}
|
||||
|
@ -261,6 +260,12 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
|
|||
// Note that this also tests if we're wonky and return an achieved rf greater than the number of live replicas.
|
||||
assertTrue("Expected rf="+expectedRf+" for batch but got "+
|
||||
batchRf + "; clusterState: " + printClusterStateInfo(), batchRf == expectedRf);
|
||||
if (up.getParams() != null && up.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
|
||||
// If "min_rf" was specified in the request, it must be in the response for back compatibility
|
||||
assertNotNull("Expecting min_rf to be in the response, since it was explicitly set in the request", hdr.get(UpdateRequest.MIN_REPFACT));
|
||||
assertEquals("Unexpected min_rf in the response",
|
||||
Integer.parseInt(up.getParams().get(UpdateRequest.MIN_REPFACT)), hdr.get(UpdateRequest.MIN_REPFACT));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -282,7 +287,7 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
|
|||
int rf = sendDoc(1, minRf);
|
||||
assertRf(3, "all replicas should be active", rf);
|
||||
|
||||
// Uses cloudClient do do it's work
|
||||
// Uses cloudClient to do it's work
|
||||
doDBIdWithRetry(3, 5, "deletes should have propagated to all 3 replicas", 1);
|
||||
doDBQWithRetry(3, 5, "deletes should have propagated to all 3 replicas", 1);
|
||||
|
||||
|
@ -292,7 +297,7 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
|
|||
rf = sendDoc(2, minRf);
|
||||
assertRf(2, "one replica should be down", rf);
|
||||
|
||||
// Uses cloudClient do do it's work
|
||||
// Uses cloudClient to do it's work
|
||||
doDBQWithRetry(2, 5, "deletes should have propagated to 2 replicas", 1);
|
||||
doDBIdWithRetry(2, 5, "deletes should have propagated to 2 replicas", 1);
|
||||
|
||||
|
@ -398,8 +403,8 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
|
|||
addDocs(docIds, expectedRf, retries);
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
req.deleteByQuery("id:(" + StringUtils.join(docIds, " OR ") + ")");
|
||||
req.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(expectedRf));
|
||||
doDelete(req, msg, expectedRf, retries);
|
||||
boolean minRfExplicit = maybeAddMinRfExplicitly(expectedRf, req);
|
||||
doDelete(req, msg, expectedRf, retries, minRfExplicit);
|
||||
}
|
||||
|
||||
protected void doDBIdWithRetry(int expectedRf, int retries, String msg, int docsToAdd) throws Exception {
|
||||
|
@ -407,14 +412,18 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
|
|||
addDocs(docIds, expectedRf, retries);
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
req.deleteById(StringUtils.join(docIds, ","));
|
||||
req.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(expectedRf));
|
||||
doDelete(req, msg, expectedRf, retries);
|
||||
boolean minRfExplicit = maybeAddMinRfExplicitly(expectedRf, req);
|
||||
doDelete(req, msg, expectedRf, retries, minRfExplicit);
|
||||
}
|
||||
|
||||
protected void doDelete(UpdateRequest req, String msg, int expectedRf, int retries) throws IOException, SolrServerException, InterruptedException {
|
||||
protected void doDelete(UpdateRequest req, String msg, int expectedRf, int retries, boolean minRfExplicit) throws IOException, SolrServerException, InterruptedException {
|
||||
int achievedRf = -1;
|
||||
for (int idx = 0; idx < retries; ++idx) {
|
||||
achievedRf = cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(req));
|
||||
NamedList<Object> response = cloudClient.request(req);
|
||||
if (minRfExplicit) {
|
||||
assertMinRfInResponse(expectedRf, response);
|
||||
}
|
||||
achievedRf = cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), response);
|
||||
if (achievedRf == expectedRf) return;
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
@ -423,12 +432,36 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
protected int sendDoc(int docId, int minRf) throws Exception {
|
||||
UpdateRequest up = new UpdateRequest();
|
||||
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
|
||||
boolean minRfExplicit = maybeAddMinRfExplicitly(minRf, up);
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField(id, String.valueOf(docId));
|
||||
doc.addField("a_t", "hello" + docId);
|
||||
up.add(doc);
|
||||
return cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
|
||||
return runAndGetAchievedRf(up, minRfExplicit, minRf);
|
||||
}
|
||||
|
||||
private int runAndGetAchievedRf(UpdateRequest up, boolean minRfExplicit, int minRf) throws SolrServerException, IOException {
|
||||
NamedList<Object> response = cloudClient.request(up);
|
||||
if (minRfExplicit) {
|
||||
assertMinRfInResponse(minRf, response);
|
||||
}
|
||||
return cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), response);
|
||||
}
|
||||
|
||||
private void assertMinRfInResponse(int minRf, NamedList<Object> response) {
|
||||
Object minRfFromResponse = response.findRecursive("responseHeader", UpdateRequest.MIN_REPFACT);
|
||||
assertNotNull("Expected min_rf header in the response", minRfFromResponse);
|
||||
assertEquals("Unexpected min_rf in response", ((Integer)minRfFromResponse).intValue(), minRf);
|
||||
}
|
||||
|
||||
private boolean maybeAddMinRfExplicitly(int minRf, UpdateRequest up) {
|
||||
boolean minRfExplicit = false;
|
||||
if (rarely()) {
|
||||
// test back compat behavior. Remove in Solr 9
|
||||
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
|
||||
minRfExplicit = true;
|
||||
}
|
||||
return minRfExplicit;
|
||||
}
|
||||
|
||||
protected void assertRf(int expected, String explain, int actual) throws Exception {
|
||||
|
|
|
@ -461,8 +461,8 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
|
|||
AddUpdateCommand cmd = new AddUpdateCommand(null);
|
||||
cmd.solrDoc = sdoc("id", id.incrementAndGet());
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
RollupRequestReplicationTracker rollupReqTracker = new RollupRequestReplicationTracker("2");
|
||||
LeaderRequestReplicationTracker leaderReqTracker = new LeaderRequestReplicationTracker("shard1", 2);
|
||||
RollupRequestReplicationTracker rollupReqTracker = new RollupRequestReplicationTracker();
|
||||
LeaderRequestReplicationTracker leaderReqTracker = new LeaderRequestReplicationTracker("shard1");
|
||||
|
||||
cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReqTracker, leaderReqTracker);
|
||||
cmdDistrib.finish();
|
||||
|
|
|
@ -36,6 +36,8 @@ If an update fails because cores are reloading schemas and some have finished bu
|
|||
|
||||
When using a replication factor greater than one, an update request may succeed on the shard leader but fail on one or more of the replicas. For instance, consider a collection with one shard and a replication factor of three. In this case, you have a shard leader and two additional replicas. If an update request succeeds on the leader but fails on both replicas, for whatever reason, the update request is still considered successful from the perspective of the client. The replicas that missed the update will sync with the leader when they recover.
|
||||
|
||||
Behind the scenes, this means that Solr has accepted updates that are only on one of the nodes (the current leader). Solr supports the optional `min_rf` parameter on update requests that cause the server to return the achieved replication factor for an update request in the response. For the example scenario described above, if the client application included `min_rf >= 1`, then Solr would return `rf=1` in the Solr response header because the request only succeeded on the leader. The update request will still be accepted as the `min_rf` parameter only tells Solr that the client application wishes to know what the achieved replication factor was for the update request. In other words, `min_rf` does not mean Solr will enforce a minimum replication factor as Solr does not support rolling back updates that succeed on a subset of replicas.
|
||||
Behind the scenes, this means that Solr has accepted updates that are only on one of the nodes (the current leader). To make the client aware of this, Solr includes in the response header the "Achieved Replication Factor" (`rf`). The achieved replication factor is the number of replicas of the shard that actually received the update request (including the leader), in the above example, 1. In the case of multi-shard update requests, the achieved replication factor is the minimum achieved replication factor across all shards.
|
||||
|
||||
On the client side, if the achieved replication factor is less than the acceptable level, then the client application can take additional measures to handle the degraded state. For instance, a client application may want to keep a log of which update requests were sent while the state of the collection was degraded and then resend the updates once the problem has been resolved. In short, `min_rf` is an optional mechanism for a client application to be warned that an update request was accepted while the collection is in a degraded state.
|
||||
On the client side, if the achieved replication factor is less than the acceptable level, then the client application can take additional measures to handle the degraded state. For instance, a client application may want to keep a log of which update requests were sent while the state of the collection was degraded and then resend the updates once the problem has been resolved.
|
||||
|
||||
NOTE: In previous version of Solr, the `min_rf` parameter had to be specified to ask Solr for the achieved replication factor. Now it is always included in the response.
|
||||
|
|
|
@ -54,6 +54,10 @@ import static org.apache.solr.common.params.ShardParams._ROUTE_;
|
|||
public class UpdateRequest extends AbstractUpdateRequest {
|
||||
|
||||
public static final String REPFACT = "rf";
|
||||
/**
|
||||
* @deprecated Solr now always includes in the response the {@link #REPFACT}, this parameter
|
||||
* doesn't need to be explicitly set
|
||||
*/
|
||||
public static final String MIN_REPFACT = "min_rf";
|
||||
public static final String VER = "ver";
|
||||
public static final String OVERWRITE = "ow";
|
||||
|
|
|
@ -922,6 +922,8 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
|
|||
protected void compareSolrResponses(SolrResponse a, SolrResponse b) {
|
||||
// SOLR-3345: Checking QTime value can be skipped as there is no guarantee that the numbers will match.
|
||||
handle.put("QTime", SKIPVAL);
|
||||
// rf will be different since the control collection doesn't usually have multiple replicas
|
||||
handle.put("rf", SKIPVAL);
|
||||
String cmp = compare(a.getResponse(), b.getResponse(), flags, handle);
|
||||
if (cmp != null) {
|
||||
log.error("Mismatched responses:\n" + a + "\n" + b);
|
||||
|
|
|
@ -794,7 +794,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
@SuppressWarnings("rawtypes")
|
||||
protected static int sendDocsWithRetry(CloudSolrClient cloudClient, String collection, List<SolrInputDocument> batch, int minRf, int maxRetries, int waitBeforeRetry) throws Exception {
|
||||
UpdateRequest up = new UpdateRequest();
|
||||
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
|
||||
up.add(batch);
|
||||
NamedList resp = null;
|
||||
int numRetries = 0;
|
||||
|
|
Loading…
Reference in New Issue