From cc62b9fac2302b8db627490efb88482ff6bbde54 Mon Sep 17 00:00:00 2001 From: Yonik Seeley Date: Fri, 11 Oct 2019 15:07:03 -0400 Subject: [PATCH] SOLR-13815: fix live split data loss due to cluster state change between checking current shard state and getting list of subShards (#920) * SOLR-13815: add simple live split test to help debugging possible issue * SOLR-13815: fix live split data loss due to cluster state change berween checking current shard state and getting list of subShards --- solr/CHANGES.txt | 4 + .../DistributedZkUpdateProcessor.java | 46 ++++-- .../org/apache/solr/cloud/SplitShardTest.java | 138 ++++++++++++++++++ 3 files changed, 174 insertions(+), 14 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index ac049a475c2..8f11d5d2afc 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -252,6 +252,10 @@ Bug Fixes * SOLR-13829: RecursiveEvaluator casts Continuous numbers to Discrete Numbers, causing mismatch (Trey Grainger, Joel Bernstein) +* SOLR-13815: Live shard split (where updates actively continue during the split) can lose updates due to cluster + state happening to change between checking if the current shard is active and later checking if there are any + sub-shard leaders to forward the update to. (yonik) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java index 22e6956f15d..a76b6be2aac 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java @@ -83,6 +83,16 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { private final String collection; private boolean readOnlyCollection = false; + // The cached immutable clusterState for the update... usually refreshed for each individual update. + // Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions + // such as SOLR-13815 (live split data loss.) Most likely, the only valid reasons for updating clusterState should be on + // certain types of failure + retry. + // Note: there may be other races related to + // 1) cluster topology change across multiple adds + // 2) use of methods directly on zkController that use a different clusterState + // 3) in general, not controlling carefully enough exactly when our view of clusterState is updated + protected ClusterState clusterState; + // should we clone the document before sending it to replicas? // this is set to true in the constructor if the next processors in the chain // are custom and may modify the SolrInputDocument racing with its serialization for replication @@ -103,7 +113,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler()); cloneRequiredOnLeader = isCloneRequiredOnLeader(next); collection = cloudDesc.getCollectionName(); - DocCollection coll = zkController.getClusterState().getCollectionOrNull(collection); + clusterState = zkController.getClusterState(); + DocCollection coll = clusterState.getCollectionOrNull(collection); if (coll != null) { // check readOnly property in coll state readOnlyCollection = coll.isReadOnly(); @@ -138,6 +149,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { @Override public void processCommit(CommitUpdateCommand cmd) throws IOException { + clusterState = zkController.getClusterState(); assert TestInjection.injectFailUpdateRequests(); @@ -216,6 +228,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { @Override public void processAdd(AddUpdateCommand cmd) throws IOException { + clusterState = zkController.getClusterState(); + assert TestInjection.injectFailUpdateRequests(); if (isReadOnly()) { @@ -235,7 +249,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { protected void doDistribAdd(AddUpdateCommand cmd) throws IOException { if (isLeader && !isSubShardLeader) { - DocCollection coll = zkController.getClusterState().getCollection(collection); + DocCollection coll = clusterState.getCollection(collection); List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument()); // the list will actually have only one element for an add request if (subShardLeaders != null && !subShardLeaders.isEmpty()) { @@ -246,7 +260,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId()); cmdDistrib.distribAdd(cmd, subShardLeaders, params, true); } - final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument()); + final List nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument()); if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); @@ -290,6 +304,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { @Override public void processDelete(DeleteUpdateCommand cmd) throws IOException { + clusterState = zkController.getClusterState(); + if (isReadOnly()) { throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); } @@ -311,7 +327,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { @Override protected void doDistribDeleteById(DeleteUpdateCommand cmd) throws IOException { if (isLeader && !isSubShardLeader) { - DocCollection coll = zkController.getClusterState().getCollection(collection); + DocCollection coll = clusterState.getCollection(collection); List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getId(), null); // the list will actually have only one element for an add request if (subShardLeaders != null && !subShardLeaders.isEmpty()) { @@ -323,7 +339,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, null, null); } - final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getId(), null); + final List nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, cmd.getId(), null); if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); @@ -366,7 +382,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { // - log + execute the local DBQ DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); - DocCollection coll = zkController.getClusterState().getCollection(collection); + DocCollection coll = clusterState.getCollection(collection); if (DistribPhase.NONE == phase) { if (rollupReplicationTracker == null) { @@ -485,7 +501,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { if (subShardLeaders != null) { cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, rollupReplicationTracker, leaderReplicationTracker); } - final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, null, null); + final List nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, null, null); if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); @@ -588,8 +604,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { return null; } - ClusterState cstate = zkController.getClusterState(); - DocCollection coll = cstate.getCollection(collection); + clusterState = zkController.getClusterState(); + DocCollection coll = clusterState.getCollection(collection); Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll); if (slice == null) { @@ -650,7 +666,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { // that means I want to forward onto my replicas... // so get the replicas... forwardToLeader = false; - ClusterState clusterState = zkController.getZkStateReader().getClusterState(); String leaderCoreNodeName = leaderReplica.getName(); List replicas = clusterState.getCollection(collection) .getSlice(shardId) @@ -733,7 +748,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { private List getCollectionUrls(String collection, EnumSet types, boolean onlyLeaders) { - ClusterState clusterState = zkController.getClusterState(); final DocCollection docCollection = clusterState.getCollectionOrNull(collection); if (collection == null || docCollection.getSlicesMap() == null) { throw new ZooKeeperException(SolrException.ErrorCode.BAD_REQUEST, @@ -804,7 +818,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { } protected List getReplicaNodesForLeader(String shardId, Replica leaderReplica) { - ClusterState clusterState = zkController.getZkStateReader().getClusterState(); String leaderCoreNodeName = leaderReplica.getName(); List replicas = clusterState.getCollection(collection) .getSlice(shardId) @@ -858,7 +871,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { || coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) { Replica sliceLeader = aslice.getLeader(); // slice leader can be null because node/shard is created zk before leader election - if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) { + if (sliceLeader != null && clusterState.liveNodesContain(sliceLeader.getNodeName())) { if (nodes == null) nodes = new ArrayList<>(); ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader); nodes.add(new SolrCmdDistributor.StdNode(nodeProps, coll.getName(), aslice.getName())); @@ -955,7 +968,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { if (isReplayOrPeersync) return; String from = req.getParams().get(DISTRIB_FROM); - ClusterState clusterState = zkController.getClusterState(); DocCollection docCollection = clusterState.getCollection(collection); Slice mySlice = docCollection.getSlice(cloudDesc.getShardId()); @@ -1015,6 +1027,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { @Override public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException { + clusterState = zkController.getClusterState(); + if (isReadOnly()) { throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); } @@ -1023,6 +1037,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { @Override public void processRollback(RollbackUpdateCommand cmd) throws IOException { + clusterState = zkController.getClusterState(); + if (isReadOnly()) { throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); } @@ -1031,6 +1047,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { @Override public void finish() throws IOException { + clusterState = zkController.getClusterState(); + assertNotFinished(); doFinish(); diff --git a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java index 9d4b74cb96a..71ff72c0670 100644 --- a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java @@ -18,19 +18,32 @@ package org.apache.solr.cloud; import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SplitShardTest extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final String COLLECTION_NAME = "splitshardtest-collection"; @@ -133,4 +146,129 @@ public class SplitShardTest extends SolrCloudTestCase { assertEquals("wrong range in s1_1", expected1, delta1); } + + CloudSolrClient createCollection(String collectionName, int repFactor) throws Exception { + + CollectionAdminRequest + .createCollection(collectionName, "conf", 1, repFactor) + .setMaxShardsPerNode(100) + .process(cluster.getSolrClient()); + + cluster.waitForActiveCollection(collectionName, 1, repFactor); + + CloudSolrClient client = cluster.getSolrClient(); + client.setDefaultCollection(collectionName); + return client; + } + + + long getNumDocs(CloudSolrClient client) throws Exception { + String collectionName = client.getDefaultCollection(); + DocCollection collection = client.getZkStateReader().getClusterState().getCollection(collectionName); + Collection slices = collection.getSlices(); + + long totCount = 0; + for (Slice slice : slices) { + if (!slice.getState().equals(Slice.State.ACTIVE)) continue; + long lastReplicaCount = -1; + for (Replica replica : slice.getReplicas()) { + SolrClient replicaClient = getHttpSolrClient(replica.getBaseUrl() + "/" + replica.getCoreName()); + long numFound = 0; + try { + numFound = replicaClient.query(params("q", "*:*", "distrib", "false")).getResults().getNumFound(); + log.info("Replica count=" + numFound + " for " + replica); + } finally { + replicaClient.close(); + } + if (lastReplicaCount >= 0) { + assertEquals("Replica doc count for " + replica, lastReplicaCount, numFound); + } + lastReplicaCount = numFound; + } + totCount += lastReplicaCount; + } + + + long cloudClientDocs = client.query(new SolrQuery("*:*")).getResults().getNumFound(); + assertEquals("Sum of shard count should equal distrib query doc count", totCount, cloudClientDocs); + return totCount; + } + + + void doLiveSplitShard(String collectionName, int repFactor) throws Exception { + final CloudSolrClient client = createCollection(collectionName, repFactor); + + final AtomicBoolean doIndex = new AtomicBoolean(true); + final AtomicInteger docsIndexed = new AtomicInteger(); + Thread indexThread = null; + try { + // start indexing client before we initiate a shard split + indexThread = new Thread(() -> { + while (doIndex.get()) { + try { + // Thread.sleep(10); // uncomment this to cap indexing rate at 100 docs per second... + int currDoc = docsIndexed.get(); + + // Try all docs in the same update request + UpdateRequest updateReq = new UpdateRequest(); + updateReq.add(sdoc("id", "doc_" + currDoc)); + UpdateResponse ursp = updateReq.commit(client, collectionName); + assertEquals(0, ursp.getStatus()); // for now, don't accept any failures + if (ursp.getStatus() == 0) { + docsIndexed.incrementAndGet(); + } + } catch (Exception e) { + fail(e.getMessage()); + break; + } + } + }); + indexThread.start(); + + Thread.sleep(100); // wait for a few docs to be indexed before invoking split + int docCount = docsIndexed.get(); + + CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName) + .setShardName("shard1"); + splitShard.process(client); + waitForState("Timed out waiting for sub shards to be active.", + collectionName, activeClusterShape(2, 3*repFactor)); // 2 repFactor for the new split shards, 1 repFactor for old replicas + + // make sure that docs were able to be indexed during the split + assertTrue(docsIndexed.get() > docCount); + + Thread.sleep(100); // wait for a few more docs to be indexed after split + + } finally { + // shut down the indexer + doIndex.set(false); + if (indexThread != null) { + indexThread.join(); + } + } + + assertTrue(docsIndexed.get() > 0); + + long numDocs = getNumDocs(client); + if (numDocs != docsIndexed.get()) { + // Find out what docs are missing. + for (int i = 0; i < docsIndexed.get(); i++) { + String id = "doc_" + i; + long cloudClientDocs = client.query(new SolrQuery("id:" + id)).getResults().getNumFound(); + if (cloudClientDocs != 1) { + log.error("MISSING DOCUMENT " + id); + } + } + } + + assertEquals("Documents are missing!", docsIndexed.get(), numDocs); + log.info("Number of documents indexed and queried : " + numDocs); + } + + @Test + public void testLiveSplit() throws Exception { + doLiveSplitShard("livesplit1", 1); + } + + }