From 259bc2baf7ce58aa0143fa6a8d43da417506cd63 Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Thu, 2 Aug 2018 08:24:32 +0530 Subject: [PATCH] SOLR-12607: Fixed two separate bugs in shard splits which can cause data loss. The first case is when using TLOG replicas only, the updates forwarded from parent shard leader to the sub-shard leader are written only in tlog and not the index. If this happens after the buffered updates have been replayed then the updates can never be executed even though they remain the transaction log. The second case is when synchronously forwarding updates to sub-shard leader fails and the underlying errors are not propagated to the client --- solr/CHANGES.txt | 8 +- .../cloud/api/collections/AddReplicaCmd.java | 2 +- .../org/apache/solr/core/CoreContainer.java | 2 +- .../processor/DistributedUpdateProcessor.java | 12 +-- .../solr/cloud/ChaosMonkeyShardSplitTest.java | 9 +- .../cloud/api/collections/ShardSplitTest.java | 88 +++++++------------ 6 files changed, 51 insertions(+), 70 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 49fc7fe07e0..6864ce7a770 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -186,7 +186,13 @@ Bug Fixes * SOLR-12477: An update would return a client error(400) if it hit a AlreadyClosedException. We now return the error as a server error(500) instead (Jeffery via Varun Thacker) -* SOLR-12606: Fix InfixSuggestersTest.testShutdownDuringBuild() failures. (Steve Rowe) +* SOLR-12606: Fix InfixSuggestersTest.testShutdownDuringBuild() failures. (Steve Rowe) + +* SOLR-12607: Fixed two separate bugs in shard splits which can cause data loss. The first case is when using TLOG + replicas only, the updates forwarded from parent shard leader to the sub-shard leader are written only in tlog and + not the index. If this happens after the buffered updates have been replayed then the updates can never be executed + even though they remain the transaction log. The second case is when synchronously forwarding updates to sub-shard + leader fails and the underlying errors are not propagated to the client. (Cao Manh Dat, shalin) Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java index 0feeec99d09..c9dbaec509f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java @@ -278,7 +278,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { cloudManager).get(0).nodeName;// TODO: use replica type in this logic too } } - log.info("Node Identified {} for creating new replica", node); + log.info("Node Identified {} for creating new replica of shard {}", node, shard); if (!clusterState.liveNodesContain(node)) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live"); diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 8659e04aded..6af7c973ac3 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -1776,7 +1776,7 @@ public class CoreContainer { * @return true if we were able to successfuly perisist the repaired coreDescriptor, false otherwise. * * See SOLR-11503, This can be removed when there's no chance we'll need to upgrade a - * Solr isntallation createged with legacyCloud=true from 6.6.1 through 7.1 + * Solr installation created with legacyCloud=true from 6.6.1 through 7.1 */ public boolean repairCoreProperty(CoreDescriptor cd, String prop) { // So far, coreNodeName is the only property that we need to repair, this may get more complex as other properties diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 8d715a6731a..a21d906776c 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -432,13 +432,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange); if (isSubset && (docId == null // in case of deletes - || (docId != null && coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll)))) { + || coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) { Replica sliceLeader = aslice.getLeader(); // slice leader can be null because node/shard is created zk before leader election if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) { if (nodes == null) nodes = new ArrayList<>(); ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader); - nodes.add(new StdNode(nodeProps, coll.getName(), shardId)); + nodes.add(new StdNode(nodeProps, coll.getName(), aslice.getName())); } } } @@ -896,7 +896,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // not the leader anymore maybe or the error'd node is not my replica? if (!foundErrorNodeInReplicaList) { log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+ - shardId+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " + + cloudDesc.getShardId()+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " + "No request recovery command will be sent!"); if (!shardId.equals(cloudDesc.getShardId())) { // some replicas on other shard did not receive the updates (ex: during splitshard), @@ -1150,7 +1150,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } } - if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); } } @@ -1692,7 +1692,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return; } - if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { // TLOG replica not leader, don't write the DBQ to IW cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); } @@ -1851,7 +1851,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } - if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java index dd5f9f3007a..0af70c1a8da 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Collection; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.util.LuceneTestCase.Slow; @@ -70,6 +72,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest { final List ranges = router.partitionRange(2, shard1Range); final int[] docCounts = new int[ranges.size()]; int numReplicas = shard1.getReplicas().size(); + final Set documentIds = ConcurrentHashMap.newKeySet(1024); Thread indexThread = null; OverseerRestarter killer = null; @@ -79,7 +82,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest { try { del("*:*"); for (int id = 0; id < 100; id++) { - indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id); + indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds); } commit(); @@ -89,7 +92,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest { int max = atLeast(401); for (int id = 101; id < max; id++) { try { - indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id); + indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds); Thread.sleep(atLeast(25)); } catch (Exception e) { log.error("Exception while adding doc", e); @@ -153,7 +156,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest { } } - checkDocCountsAndShardStates(docCounts, numReplicas); + checkDocCountsAndShardStates(docCounts, numReplicas, documentIds); // todo - can't call waitForThingsToLevelOut because it looks for // jettys of all shards diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java index f6ee7b4de31..a3fbd19fd42 100644 --- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,10 +40,8 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; -import org.apache.solr.client.solrj.response.CoreAdminResponse; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.cloud.AbstractDistribZkTestBase; @@ -75,8 +74,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP; import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; -@LogLevel("org.apache.solr.cloud.api.collections=DEBUG") @Slow +@LogLevel("org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.cloud.api.collections=DEBUG;org.apache.solr.cloud.OverseerTaskProcessor=DEBUG;org.apache.solr.util.TestInjection=DEBUG") public class ShardSplitTest extends BasicDistributedZkTest { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -424,10 +423,13 @@ public class ShardSplitTest extends BasicDistributedZkTest { } @Test - @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") public void testSplitWithChaosMonkey() throws Exception { waitForThingsToLevelOut(15); + log.info("Using legacyCloud=false for cluster"); + CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false") + .process(cloudClient); + List indexers = new ArrayList<>(); try { for (int i = 0; i < 1; i++) { @@ -502,7 +504,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { // indexed are available in SolrCloud and if the split succeeded then all replicas of the sub-shard // must be consistent (i.e. have same numdocs) - log.info("Shard split request state is COMPLETED"); + log.info("Shard split request state is {}", splitStatus == null ? "unknown" : splitStatus.getKey()); stop.set(true); monkeyThread.join(); Set addFails = new HashSet<>(); @@ -518,37 +520,9 @@ public class ShardSplitTest extends BasicDistributedZkTest { cloudClient.getZkStateReader().forceUpdateCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION); log.info("Current collection state: {}", printClusterStateInfo(AbstractDistribZkTestBase.DEFAULT_COLLECTION)); - boolean replicaCreationsFailed = false; - if (splitStatus == RequestStatusState.FAILED) { - // either one or more replica creation failed (because it may have been created on the same parent shard leader node) - // or the split may have failed while trying to soft-commit *after* all replicas have been created - // the latter counts as a successful switch even if the API doesn't say so - // so we must find a way to distinguish between the two - // an easy way to do that is to look at the sub-shard replicas and check if the replica core actually exists - // instead of existing solely inside the cluster state - DocCollection collectionState = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION); - Slice slice10 = collectionState.getSlice(SHARD1_0); - Slice slice11 = collectionState.getSlice(SHARD1_1); - if (slice10 != null && slice11 != null) { - for (Replica replica : slice10) { - if (!doesReplicaCoreExist(replica)) { - replicaCreationsFailed = true; - break; - } - } - for (Replica replica : slice11) { - if (!doesReplicaCoreExist(replica)) { - replicaCreationsFailed = true; - break; - } - } - } - } - // true if sub-shard states switch to 'active' eventually AtomicBoolean areSubShardsActive = new AtomicBoolean(false); - - if (!replicaCreationsFailed) { + if (splitStatus == RequestStatusState.COMPLETED) { // all sub-shard replicas were created successfully so all cores must recover eventually waitForRecoveriesToFinish(AbstractDistribZkTestBase.DEFAULT_COLLECTION, true); // let's wait for the overseer to switch shard states @@ -611,23 +585,6 @@ public class ShardSplitTest extends BasicDistributedZkTest { } } - private boolean doesReplicaCoreExist(Replica replica) throws IOException { - try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getStr(BASE_URL_PROP)) - .withHttpClient(cloudClient.getLbClient().getHttpClient()).build()) { - String coreName = replica.getCoreName(); - try { - CoreAdminResponse status = CoreAdminRequest.getStatus(coreName, client); - if (status.getCoreStatus(coreName) == null || status.getCoreStatus(coreName).size() == 0) { - return false; - } - } catch (Exception e) { - log.warn("Error gettting core status of replica " + replica + ". Perhaps it does not exist!", e); - return false; - } - } - return true; - } - @Test public void testSplitShardWithRule() throws Exception { doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK); @@ -718,6 +675,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { } else { subRanges = router.partitionRange(2, shard1Range); } + final Set documentIds = ConcurrentHashMap.newKeySet(1024); final List ranges = subRanges; final int[] docCounts = new int[ranges.size()]; int numReplicas = shard1.getReplicas().size(); @@ -728,7 +686,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { del("*:*"); for (int id = 0; id <= 100; id++) { String shardKey = "" + (char)('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution - indexAndUpdateCount(router, ranges, docCounts, shardKey + "!" + String.valueOf(id), id); + indexAndUpdateCount(router, ranges, docCounts, shardKey + "!" + String.valueOf(id), id, documentIds); } commit(); @@ -742,7 +700,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { Set deleted = new HashSet<>(); for (int id = 101; id < max; id++) { try { - indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id); + indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds); Thread.sleep(sleep); if (usually(random)) { String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101); @@ -750,6 +708,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { try { deleteAndUpdateCount(router, ranges, docCounts, delId); deleted.add(delId); + documentIds.remove(String.valueOf(delId)); } catch (Exception e) { log.error("Exception while deleting docs", e); } @@ -790,7 +749,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { } waitForRecoveriesToFinish(true); - checkDocCountsAndShardStates(docCounts, numReplicas); + checkDocCountsAndShardStates(docCounts, numReplicas, documentIds); } @@ -957,7 +916,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { } } - protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas) throws Exception { + protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas, Set documentIds) throws Exception { ClusterState clusterState = null; Slice slice1_0 = null, slice1_1 = null; int i = 0; @@ -1005,7 +964,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { } long shard11Count = response2.getResults().getNumFound(); - logDebugHelp(docCounts, response, shard10Count, response2, shard11Count); + logDebugHelp(docCounts, response, shard10Count, response2, shard11Count, documentIds); assertEquals("Wrong doc count on shard1_0. See SOLR-5309", docCounts[0], shard10Count); assertEquals("Wrong doc count on shard1_1. See SOLR-5309", docCounts[1], shard11Count); @@ -1068,12 +1027,13 @@ public class ShardSplitTest extends BasicDistributedZkTest { } } - protected void indexAndUpdateCount(DocRouter router, List ranges, int[] docCounts, String id, int n) throws Exception { + protected void indexAndUpdateCount(DocRouter router, List ranges, int[] docCounts, String id, int n, Set documentIds) throws Exception { index("id", id, "n_ti", n); int idx = getHashRangeIdx(router, ranges, id); if (idx != -1) { docCounts[idx]++; + documentIds.add(String.valueOf(id)); } } @@ -1101,12 +1061,14 @@ public class ShardSplitTest extends BasicDistributedZkTest { return -1; } - protected void logDebugHelp(int[] docCounts, QueryResponse response, long shard10Count, QueryResponse response2, long shard11Count) { + protected void logDebugHelp(int[] docCounts, QueryResponse response, long shard10Count, QueryResponse response2, long shard11Count, Set documentIds) { for (int i = 0; i < docCounts.length; i++) { int docCount = docCounts[i]; log.info("Expected docCount for shard1_{} = {}", i, docCount); } + Set found = new HashSet<>(1024); + log.info("Actual docCount for shard1_0 = {}", shard10Count); log.info("Actual docCount for shard1_1 = {}", shard11Count); Map idVsVersion = new HashMap<>(); @@ -1119,6 +1081,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { if (old != null) { log.error("EXTRA: ID: " + document.getFieldValue("id") + " on shard1_0. Old version: " + old.getFieldValue("_version_") + " new version: " + document.getFieldValue("_version_")); } + found.add(document.getFieldValue("id").toString()); } for (int i = 0; i < response2.getResults().size(); i++) { SolrDocument document = response2.getResults().get(i); @@ -1131,6 +1094,15 @@ public class ShardSplitTest extends BasicDistributedZkTest { if (old != null) { log.error("EXTRA: ID: " + document.getFieldValue("id") + " on shard1_1. Old version: " + old.getFieldValue("_version_") + " new version: " + document.getFieldValue("_version_")); } + found.add(document.getFieldValue("id").toString()); + } + + if (found.size() < documentIds.size()) { + documentIds.removeAll(found); + log.error("MISSING: ID: " + documentIds); + } else if (found.size() > documentIds.size()) { + found.removeAll(documentIds); + log.error("EXTRA: ID: " + found); } }