SOLR-13815: fix live split data loss due to cluster state change between checking current shard state and getting list of subShards (#920)

* SOLR-13815: add simple live split test to help debugging possible issue

* SOLR-13815: fix live split data loss due to cluster state change berween checking current shard state and getting list of subShards
This commit is contained in:
Yonik Seeley 2019-10-11 15:07:03 -04:00 committed by yonik
parent a9c7750402
commit cc62b9fac2
3 changed files with 174 additions and 14 deletions

View File

@ -252,6 +252,10 @@ Bug Fixes
* SOLR-13829: RecursiveEvaluator casts Continuous numbers to Discrete Numbers, causing mismatch (Trey Grainger, Joel Bernstein)
* SOLR-13815: Live shard split (where updates actively continue during the split) can lose updates due to cluster
state happening to change between checking if the current shard is active and later checking if there are any
sub-shard leaders to forward the update to. (yonik)
Other Changes
----------------------

View File

@ -83,6 +83,16 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
private final String collection;
private boolean readOnlyCollection = false;
// The cached immutable clusterState for the update... usually refreshed for each individual update.
// Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions
// such as SOLR-13815 (live split data loss.) Most likely, the only valid reasons for updating clusterState should be on
// certain types of failure + retry.
// Note: there may be other races related to
// 1) cluster topology change across multiple adds
// 2) use of methods directly on zkController that use a different clusterState
// 3) in general, not controlling carefully enough exactly when our view of clusterState is updated
protected ClusterState clusterState;
// should we clone the document before sending it to replicas?
// this is set to true in the constructor if the next processors in the chain
// are custom and may modify the SolrInputDocument racing with its serialization for replication
@ -103,7 +113,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
cloneRequiredOnLeader = isCloneRequiredOnLeader(next);
collection = cloudDesc.getCollectionName();
DocCollection coll = zkController.getClusterState().getCollectionOrNull(collection);
clusterState = zkController.getClusterState();
DocCollection coll = clusterState.getCollectionOrNull(collection);
if (coll != null) {
// check readOnly property in coll state
readOnlyCollection = coll.isReadOnly();
@ -138,6 +149,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
clusterState = zkController.getClusterState();
assert TestInjection.injectFailUpdateRequests();
@ -216,6 +228,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
clusterState = zkController.getClusterState();
assert TestInjection.injectFailUpdateRequests();
if (isReadOnly()) {
@ -235,7 +249,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
if (isLeader && !isSubShardLeader) {
DocCollection coll = zkController.getClusterState().getCollection(collection);
DocCollection coll = clusterState.getCollection(collection);
List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
// the list<node> will actually have only one element for an add request
if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
@ -246,7 +260,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
cmdDistrib.distribAdd(cmd, subShardLeaders, params, true);
}
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@ -290,6 +304,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
clusterState = zkController.getClusterState();
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
}
@ -311,7 +327,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
protected void doDistribDeleteById(DeleteUpdateCommand cmd) throws IOException {
if (isLeader && !isSubShardLeader) {
DocCollection coll = zkController.getClusterState().getCollection(collection);
DocCollection coll = clusterState.getCollection(collection);
List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getId(), null);
// the list<node> will actually have only one element for an add request
if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
@ -323,7 +339,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, null, null);
}
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getId(), null);
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, cmd.getId(), null);
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@ -366,7 +382,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
// - log + execute the local DBQ
DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
DocCollection coll = zkController.getClusterState().getCollection(collection);
DocCollection coll = clusterState.getCollection(collection);
if (DistribPhase.NONE == phase) {
if (rollupReplicationTracker == null) {
@ -485,7 +501,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (subShardLeaders != null) {
cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, rollupReplicationTracker, leaderReplicationTracker);
}
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, null, null);
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, null, null);
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@ -588,8 +604,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
return null;
}
ClusterState cstate = zkController.getClusterState();
DocCollection coll = cstate.getCollection(collection);
clusterState = zkController.getClusterState();
DocCollection coll = clusterState.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
if (slice == null) {
@ -650,7 +666,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
String leaderCoreNodeName = leaderReplica.getName();
List<Replica> replicas = clusterState.getCollection(collection)
.getSlice(shardId)
@ -733,7 +748,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
private List<SolrCmdDistributor.Node> getCollectionUrls(String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
ClusterState clusterState = zkController.getClusterState();
final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (collection == null || docCollection.getSlicesMap() == null) {
throw new ZooKeeperException(SolrException.ErrorCode.BAD_REQUEST,
@ -804,7 +818,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, Replica leaderReplica) {
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
String leaderCoreNodeName = leaderReplica.getName();
List<Replica> replicas = clusterState.getCollection(collection)
.getSlice(shardId)
@ -858,7 +871,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|| coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) {
Replica sliceLeader = aslice.getLeader();
// slice leader can be null because node/shard is created zk before leader election
if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) {
if (sliceLeader != null && clusterState.liveNodesContain(sliceLeader.getNodeName())) {
if (nodes == null) nodes = new ArrayList<>();
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
nodes.add(new SolrCmdDistributor.StdNode(nodeProps, coll.getName(), aslice.getName()));
@ -955,7 +968,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (isReplayOrPeersync) return;
String from = req.getParams().get(DISTRIB_FROM);
ClusterState clusterState = zkController.getClusterState();
DocCollection docCollection = clusterState.getCollection(collection);
Slice mySlice = docCollection.getSlice(cloudDesc.getShardId());
@ -1015,6 +1027,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
clusterState = zkController.getClusterState();
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
}
@ -1023,6 +1037,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processRollback(RollbackUpdateCommand cmd) throws IOException {
clusterState = zkController.getClusterState();
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
}
@ -1031,6 +1047,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void finish() throws IOException {
clusterState = zkController.getClusterState();
assertNotFinished();
doFinish();

View File

@ -18,19 +18,32 @@
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SplitShardTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String COLLECTION_NAME = "splitshardtest-collection";
@ -133,4 +146,129 @@ public class SplitShardTest extends SolrCloudTestCase {
assertEquals("wrong range in s1_1", expected1, delta1);
}
CloudSolrClient createCollection(String collectionName, int repFactor) throws Exception {
CollectionAdminRequest
.createCollection(collectionName, "conf", 1, repFactor)
.setMaxShardsPerNode(100)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collectionName, 1, repFactor);
CloudSolrClient client = cluster.getSolrClient();
client.setDefaultCollection(collectionName);
return client;
}
long getNumDocs(CloudSolrClient client) throws Exception {
String collectionName = client.getDefaultCollection();
DocCollection collection = client.getZkStateReader().getClusterState().getCollection(collectionName);
Collection<Slice> slices = collection.getSlices();
long totCount = 0;
for (Slice slice : slices) {
if (!slice.getState().equals(Slice.State.ACTIVE)) continue;
long lastReplicaCount = -1;
for (Replica replica : slice.getReplicas()) {
SolrClient replicaClient = getHttpSolrClient(replica.getBaseUrl() + "/" + replica.getCoreName());
long numFound = 0;
try {
numFound = replicaClient.query(params("q", "*:*", "distrib", "false")).getResults().getNumFound();
log.info("Replica count=" + numFound + " for " + replica);
} finally {
replicaClient.close();
}
if (lastReplicaCount >= 0) {
assertEquals("Replica doc count for " + replica, lastReplicaCount, numFound);
}
lastReplicaCount = numFound;
}
totCount += lastReplicaCount;
}
long cloudClientDocs = client.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals("Sum of shard count should equal distrib query doc count", totCount, cloudClientDocs);
return totCount;
}
void doLiveSplitShard(String collectionName, int repFactor) throws Exception {
final CloudSolrClient client = createCollection(collectionName, repFactor);
final AtomicBoolean doIndex = new AtomicBoolean(true);
final AtomicInteger docsIndexed = new AtomicInteger();
Thread indexThread = null;
try {
// start indexing client before we initiate a shard split
indexThread = new Thread(() -> {
while (doIndex.get()) {
try {
// Thread.sleep(10); // uncomment this to cap indexing rate at 100 docs per second...
int currDoc = docsIndexed.get();
// Try all docs in the same update request
UpdateRequest updateReq = new UpdateRequest();
updateReq.add(sdoc("id", "doc_" + currDoc));
UpdateResponse ursp = updateReq.commit(client, collectionName);
assertEquals(0, ursp.getStatus()); // for now, don't accept any failures
if (ursp.getStatus() == 0) {
docsIndexed.incrementAndGet();
}
} catch (Exception e) {
fail(e.getMessage());
break;
}
}
});
indexThread.start();
Thread.sleep(100); // wait for a few docs to be indexed before invoking split
int docCount = docsIndexed.get();
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
.setShardName("shard1");
splitShard.process(client);
waitForState("Timed out waiting for sub shards to be active.",
collectionName, activeClusterShape(2, 3*repFactor)); // 2 repFactor for the new split shards, 1 repFactor for old replicas
// make sure that docs were able to be indexed during the split
assertTrue(docsIndexed.get() > docCount);
Thread.sleep(100); // wait for a few more docs to be indexed after split
} finally {
// shut down the indexer
doIndex.set(false);
if (indexThread != null) {
indexThread.join();
}
}
assertTrue(docsIndexed.get() > 0);
long numDocs = getNumDocs(client);
if (numDocs != docsIndexed.get()) {
// Find out what docs are missing.
for (int i = 0; i < docsIndexed.get(); i++) {
String id = "doc_" + i;
long cloudClientDocs = client.query(new SolrQuery("id:" + id)).getResults().getNumFound();
if (cloudClientDocs != 1) {
log.error("MISSING DOCUMENT " + id);
}
}
}
assertEquals("Documents are missing!", docsIndexed.get(), numDocs);
log.info("Number of documents indexed and queried : " + numDocs);
}
@Test
public void testLiveSplit() throws Exception {
doLiveSplitShard("livesplit1", 1);
}
}