mirror of https://github.com/apache/lucene.git
SOLR-13815: fix live split data loss due to cluster state change between checking current shard state and getting list of subShards (#920)
* SOLR-13815: add simple live split test to help debugging possible issue * SOLR-13815: fix live split data loss due to cluster state change berween checking current shard state and getting list of subShards
This commit is contained in:
parent
0295e281d0
commit
a057b0d159
|
@ -314,6 +314,10 @@ Bug Fixes
|
|||
|
||||
* SOLR-13829: RecursiveEvaluator casts Continuous numbers to Discrete Numbers, causing mismatch (Trey Grainger, Joel Bernstein)
|
||||
|
||||
* SOLR-13815: Live shard split (where updates actively continue during the split) can lose updates due to cluster
|
||||
state happening to change between checking if the current shard is active and later checking if there are any
|
||||
sub-shard leaders to forward the update to. (yonik)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -83,6 +83,16 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
private final String collection;
|
||||
private boolean readOnlyCollection = false;
|
||||
|
||||
// The cached immutable clusterState for the update... usually refreshed for each individual update.
|
||||
// Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions
|
||||
// such as SOLR-13815 (live split data loss.) Most likely, the only valid reasons for updating clusterState should be on
|
||||
// certain types of failure + retry.
|
||||
// Note: there may be other races related to
|
||||
// 1) cluster topology change across multiple adds
|
||||
// 2) use of methods directly on zkController that use a different clusterState
|
||||
// 3) in general, not controlling carefully enough exactly when our view of clusterState is updated
|
||||
protected ClusterState clusterState;
|
||||
|
||||
// should we clone the document before sending it to replicas?
|
||||
// this is set to true in the constructor if the next processors in the chain
|
||||
// are custom and may modify the SolrInputDocument racing with its serialization for replication
|
||||
|
@ -103,7 +113,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
|
||||
cloneRequiredOnLeader = isCloneRequiredOnLeader(next);
|
||||
collection = cloudDesc.getCollectionName();
|
||||
DocCollection coll = zkController.getClusterState().getCollectionOrNull(collection);
|
||||
clusterState = zkController.getClusterState();
|
||||
DocCollection coll = clusterState.getCollectionOrNull(collection);
|
||||
if (coll != null) {
|
||||
// check readOnly property in coll state
|
||||
readOnlyCollection = coll.isReadOnly();
|
||||
|
@ -138,6 +149,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
|
||||
@Override
|
||||
public void processCommit(CommitUpdateCommand cmd) throws IOException {
|
||||
clusterState = zkController.getClusterState();
|
||||
|
||||
assert TestInjection.injectFailUpdateRequests();
|
||||
|
||||
|
@ -216,6 +228,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
|
||||
@Override
|
||||
public void processAdd(AddUpdateCommand cmd) throws IOException {
|
||||
clusterState = zkController.getClusterState();
|
||||
|
||||
assert TestInjection.injectFailUpdateRequests();
|
||||
|
||||
if (isReadOnly()) {
|
||||
|
@ -235,7 +249,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
|
||||
|
||||
if (isLeader && !isSubShardLeader) {
|
||||
DocCollection coll = zkController.getClusterState().getCollection(collection);
|
||||
DocCollection coll = clusterState.getCollection(collection);
|
||||
List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
|
||||
// the list<node> will actually have only one element for an add request
|
||||
if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
|
||||
|
@ -246,7 +260,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
|
||||
cmdDistrib.distribAdd(cmd, subShardLeaders, params, true);
|
||||
}
|
||||
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
|
||||
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
|
||||
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
|
||||
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
|
||||
|
@ -290,6 +304,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
|
||||
@Override
|
||||
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
|
||||
clusterState = zkController.getClusterState();
|
||||
|
||||
if (isReadOnly()) {
|
||||
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
|
||||
}
|
||||
|
@ -311,7 +327,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
@Override
|
||||
protected void doDistribDeleteById(DeleteUpdateCommand cmd) throws IOException {
|
||||
if (isLeader && !isSubShardLeader) {
|
||||
DocCollection coll = zkController.getClusterState().getCollection(collection);
|
||||
DocCollection coll = clusterState.getCollection(collection);
|
||||
List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getId(), null);
|
||||
// the list<node> will actually have only one element for an add request
|
||||
if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
|
||||
|
@ -323,7 +339,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, null, null);
|
||||
}
|
||||
|
||||
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getId(), null);
|
||||
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, cmd.getId(), null);
|
||||
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
|
||||
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
|
||||
|
@ -366,7 +382,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
// - log + execute the local DBQ
|
||||
DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
|
||||
|
||||
DocCollection coll = zkController.getClusterState().getCollection(collection);
|
||||
DocCollection coll = clusterState.getCollection(collection);
|
||||
|
||||
if (DistribPhase.NONE == phase) {
|
||||
if (rollupReplicationTracker == null) {
|
||||
|
@ -485,7 +501,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
if (subShardLeaders != null) {
|
||||
cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, rollupReplicationTracker, leaderReplicationTracker);
|
||||
}
|
||||
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, null, null);
|
||||
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, null, null);
|
||||
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
|
||||
params = new ModifiableSolrParams(filterParams(req.getParams()));
|
||||
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
|
||||
|
@ -588,8 +604,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
return null;
|
||||
}
|
||||
|
||||
ClusterState cstate = zkController.getClusterState();
|
||||
DocCollection coll = cstate.getCollection(collection);
|
||||
clusterState = zkController.getClusterState();
|
||||
DocCollection coll = clusterState.getCollection(collection);
|
||||
Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
|
||||
|
||||
if (slice == null) {
|
||||
|
@ -650,7 +666,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
// that means I want to forward onto my replicas...
|
||||
// so get the replicas...
|
||||
forwardToLeader = false;
|
||||
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
|
||||
String leaderCoreNodeName = leaderReplica.getName();
|
||||
List<Replica> replicas = clusterState.getCollection(collection)
|
||||
.getSlice(shardId)
|
||||
|
@ -733,7 +748,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
|
||||
|
||||
private List<SolrCmdDistributor.Node> getCollectionUrls(String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
|
||||
ClusterState clusterState = zkController.getClusterState();
|
||||
final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
|
||||
if (collection == null || docCollection.getSlicesMap() == null) {
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
|
@ -804,7 +818,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
}
|
||||
|
||||
protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, Replica leaderReplica) {
|
||||
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
|
||||
String leaderCoreNodeName = leaderReplica.getName();
|
||||
List<Replica> replicas = clusterState.getCollection(collection)
|
||||
.getSlice(shardId)
|
||||
|
@ -858,7 +871,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
|| coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) {
|
||||
Replica sliceLeader = aslice.getLeader();
|
||||
// slice leader can be null because node/shard is created zk before leader election
|
||||
if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) {
|
||||
if (sliceLeader != null && clusterState.liveNodesContain(sliceLeader.getNodeName())) {
|
||||
if (nodes == null) nodes = new ArrayList<>();
|
||||
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
|
||||
nodes.add(new SolrCmdDistributor.StdNode(nodeProps, coll.getName(), aslice.getName()));
|
||||
|
@ -955,7 +968,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
if (isReplayOrPeersync) return;
|
||||
|
||||
String from = req.getParams().get(DISTRIB_FROM);
|
||||
ClusterState clusterState = zkController.getClusterState();
|
||||
|
||||
DocCollection docCollection = clusterState.getCollection(collection);
|
||||
Slice mySlice = docCollection.getSlice(cloudDesc.getShardId());
|
||||
|
@ -1015,6 +1027,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
|
||||
@Override
|
||||
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
|
||||
clusterState = zkController.getClusterState();
|
||||
|
||||
if (isReadOnly()) {
|
||||
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
|
||||
}
|
||||
|
@ -1023,6 +1037,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
|
||||
@Override
|
||||
public void processRollback(RollbackUpdateCommand cmd) throws IOException {
|
||||
clusterState = zkController.getClusterState();
|
||||
|
||||
if (isReadOnly()) {
|
||||
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
|
||||
}
|
||||
|
@ -1031,6 +1047,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|
|||
|
||||
@Override
|
||||
public void finish() throws IOException {
|
||||
clusterState = zkController.getClusterState();
|
||||
|
||||
assertNotFinished();
|
||||
|
||||
doFinish();
|
||||
|
|
|
@ -18,19 +18,32 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class SplitShardTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final String COLLECTION_NAME = "splitshardtest-collection";
|
||||
|
||||
|
@ -133,4 +146,129 @@ public class SplitShardTest extends SolrCloudTestCase {
|
|||
assertEquals("wrong range in s1_1", expected1, delta1);
|
||||
}
|
||||
|
||||
|
||||
CloudSolrClient createCollection(String collectionName, int repFactor) throws Exception {
|
||||
|
||||
CollectionAdminRequest
|
||||
.createCollection(collectionName, "conf", 1, repFactor)
|
||||
.setMaxShardsPerNode(100)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
cluster.waitForActiveCollection(collectionName, 1, repFactor);
|
||||
|
||||
CloudSolrClient client = cluster.getSolrClient();
|
||||
client.setDefaultCollection(collectionName);
|
||||
return client;
|
||||
}
|
||||
|
||||
|
||||
long getNumDocs(CloudSolrClient client) throws Exception {
|
||||
String collectionName = client.getDefaultCollection();
|
||||
DocCollection collection = client.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||
Collection<Slice> slices = collection.getSlices();
|
||||
|
||||
long totCount = 0;
|
||||
for (Slice slice : slices) {
|
||||
if (!slice.getState().equals(Slice.State.ACTIVE)) continue;
|
||||
long lastReplicaCount = -1;
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
SolrClient replicaClient = getHttpSolrClient(replica.getBaseUrl() + "/" + replica.getCoreName());
|
||||
long numFound = 0;
|
||||
try {
|
||||
numFound = replicaClient.query(params("q", "*:*", "distrib", "false")).getResults().getNumFound();
|
||||
log.info("Replica count=" + numFound + " for " + replica);
|
||||
} finally {
|
||||
replicaClient.close();
|
||||
}
|
||||
if (lastReplicaCount >= 0) {
|
||||
assertEquals("Replica doc count for " + replica, lastReplicaCount, numFound);
|
||||
}
|
||||
lastReplicaCount = numFound;
|
||||
}
|
||||
totCount += lastReplicaCount;
|
||||
}
|
||||
|
||||
|
||||
long cloudClientDocs = client.query(new SolrQuery("*:*")).getResults().getNumFound();
|
||||
assertEquals("Sum of shard count should equal distrib query doc count", totCount, cloudClientDocs);
|
||||
return totCount;
|
||||
}
|
||||
|
||||
|
||||
void doLiveSplitShard(String collectionName, int repFactor) throws Exception {
|
||||
final CloudSolrClient client = createCollection(collectionName, repFactor);
|
||||
|
||||
final AtomicBoolean doIndex = new AtomicBoolean(true);
|
||||
final AtomicInteger docsIndexed = new AtomicInteger();
|
||||
Thread indexThread = null;
|
||||
try {
|
||||
// start indexing client before we initiate a shard split
|
||||
indexThread = new Thread(() -> {
|
||||
while (doIndex.get()) {
|
||||
try {
|
||||
// Thread.sleep(10); // uncomment this to cap indexing rate at 100 docs per second...
|
||||
int currDoc = docsIndexed.get();
|
||||
|
||||
// Try all docs in the same update request
|
||||
UpdateRequest updateReq = new UpdateRequest();
|
||||
updateReq.add(sdoc("id", "doc_" + currDoc));
|
||||
UpdateResponse ursp = updateReq.commit(client, collectionName);
|
||||
assertEquals(0, ursp.getStatus()); // for now, don't accept any failures
|
||||
if (ursp.getStatus() == 0) {
|
||||
docsIndexed.incrementAndGet();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
fail(e.getMessage());
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
indexThread.start();
|
||||
|
||||
Thread.sleep(100); // wait for a few docs to be indexed before invoking split
|
||||
int docCount = docsIndexed.get();
|
||||
|
||||
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
|
||||
.setShardName("shard1");
|
||||
splitShard.process(client);
|
||||
waitForState("Timed out waiting for sub shards to be active.",
|
||||
collectionName, activeClusterShape(2, 3*repFactor)); // 2 repFactor for the new split shards, 1 repFactor for old replicas
|
||||
|
||||
// make sure that docs were able to be indexed during the split
|
||||
assertTrue(docsIndexed.get() > docCount);
|
||||
|
||||
Thread.sleep(100); // wait for a few more docs to be indexed after split
|
||||
|
||||
} finally {
|
||||
// shut down the indexer
|
||||
doIndex.set(false);
|
||||
if (indexThread != null) {
|
||||
indexThread.join();
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(docsIndexed.get() > 0);
|
||||
|
||||
long numDocs = getNumDocs(client);
|
||||
if (numDocs != docsIndexed.get()) {
|
||||
// Find out what docs are missing.
|
||||
for (int i = 0; i < docsIndexed.get(); i++) {
|
||||
String id = "doc_" + i;
|
||||
long cloudClientDocs = client.query(new SolrQuery("id:" + id)).getResults().getNumFound();
|
||||
if (cloudClientDocs != 1) {
|
||||
log.error("MISSING DOCUMENT " + id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals("Documents are missing!", docsIndexed.get(), numDocs);
|
||||
log.info("Number of documents indexed and queried : " + numDocs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLiveSplit() throws Exception {
|
||||
doLiveSplitShard("livesplit1", 1);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue