mirror of https://github.com/apache/lucene.git
SOLR-12607: Fixed two separate bugs in shard splits which can cause data loss. The first case is when using TLOG replicas only, the updates forwarded from parent shard leader to the sub-shard leader are written only in tlog and not the index. If this happens after the buffered updates have been replayed then the updates can never be executed even though they remain the transaction log. The second case is when synchronously forwarding updates to sub-shard leader fails and the underlying errors are not propagated to the client
This commit is contained in:
parent
64573c142c
commit
259bc2baf7
|
@ -186,7 +186,13 @@ Bug Fixes
|
|||
* SOLR-12477: An update would return a client error(400) if it hit a AlreadyClosedException.
|
||||
We now return the error as a server error(500) instead (Jeffery via Varun Thacker)
|
||||
|
||||
* SOLR-12606: Fix InfixSuggestersTest.testShutdownDuringBuild() failures. (Steve Rowe)
|
||||
* SOLR-12606: Fix InfixSuggestersTest.testShutdownDuringBuild() failures. (Steve Rowe)
|
||||
|
||||
* SOLR-12607: Fixed two separate bugs in shard splits which can cause data loss. The first case is when using TLOG
|
||||
replicas only, the updates forwarded from parent shard leader to the sub-shard leader are written only in tlog and
|
||||
not the index. If this happens after the buffered updates have been replayed then the updates can never be executed
|
||||
even though they remain the transaction log. The second case is when synchronously forwarding updates to sub-shard
|
||||
leader fails and the underlying errors are not propagated to the client. (Cao Manh Dat, shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
|
|
@ -278,7 +278,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
cloudManager).get(0).nodeName;// TODO: use replica type in this logic too
|
||||
}
|
||||
}
|
||||
log.info("Node Identified {} for creating new replica", node);
|
||||
log.info("Node Identified {} for creating new replica of shard {}", node, shard);
|
||||
|
||||
if (!clusterState.liveNodesContain(node)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
|
||||
|
|
|
@ -1776,7 +1776,7 @@ public class CoreContainer {
|
|||
* @return true if we were able to successfuly perisist the repaired coreDescriptor, false otherwise.
|
||||
*
|
||||
* See SOLR-11503, This can be removed when there's no chance we'll need to upgrade a
|
||||
* Solr isntallation createged with legacyCloud=true from 6.6.1 through 7.1
|
||||
* Solr installation created with legacyCloud=true from 6.6.1 through 7.1
|
||||
*/
|
||||
public boolean repairCoreProperty(CoreDescriptor cd, String prop) {
|
||||
// So far, coreNodeName is the only property that we need to repair, this may get more complex as other properties
|
||||
|
|
|
@ -432,13 +432,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange);
|
||||
if (isSubset &&
|
||||
(docId == null // in case of deletes
|
||||
|| (docId != null && coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll)))) {
|
||||
|| coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) {
|
||||
Replica sliceLeader = aslice.getLeader();
|
||||
// slice leader can be null because node/shard is created zk before leader election
|
||||
if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) {
|
||||
if (nodes == null) nodes = new ArrayList<>();
|
||||
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
|
||||
nodes.add(new StdNode(nodeProps, coll.getName(), shardId));
|
||||
nodes.add(new StdNode(nodeProps, coll.getName(), aslice.getName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -896,7 +896,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
// not the leader anymore maybe or the error'd node is not my replica?
|
||||
if (!foundErrorNodeInReplicaList) {
|
||||
log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+
|
||||
shardId+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " +
|
||||
cloudDesc.getShardId()+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " +
|
||||
"No request recovery command will be sent!");
|
||||
if (!shardId.equals(cloudDesc.getShardId())) {
|
||||
// some replicas on other shard did not receive the updates (ex: during splitshard),
|
||||
|
@ -1150,7 +1150,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
|
||||
if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
|
||||
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
|
||||
}
|
||||
}
|
||||
|
@ -1692,7 +1692,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
return;
|
||||
}
|
||||
|
||||
if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
|
||||
if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
|
||||
// TLOG replica not leader, don't write the DBQ to IW
|
||||
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
|
||||
}
|
||||
|
@ -1851,7 +1851,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
|
||||
if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
|
||||
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ import java.io.IOException;
|
|||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
|
@ -70,6 +72,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
|||
final List<DocRouter.Range> ranges = router.partitionRange(2, shard1Range);
|
||||
final int[] docCounts = new int[ranges.size()];
|
||||
int numReplicas = shard1.getReplicas().size();
|
||||
final Set<String> documentIds = ConcurrentHashMap.newKeySet(1024);
|
||||
|
||||
Thread indexThread = null;
|
||||
OverseerRestarter killer = null;
|
||||
|
@ -79,7 +82,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
|||
try {
|
||||
del("*:*");
|
||||
for (int id = 0; id < 100; id++) {
|
||||
indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id);
|
||||
indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds);
|
||||
}
|
||||
commit();
|
||||
|
||||
|
@ -89,7 +92,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
|||
int max = atLeast(401);
|
||||
for (int id = 101; id < max; id++) {
|
||||
try {
|
||||
indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id);
|
||||
indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds);
|
||||
Thread.sleep(atLeast(25));
|
||||
} catch (Exception e) {
|
||||
log.error("Exception while adding doc", e);
|
||||
|
@ -153,7 +156,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
|||
}
|
||||
}
|
||||
|
||||
checkDocCountsAndShardStates(docCounts, numReplicas);
|
||||
checkDocCountsAndShardStates(docCounts, numReplicas, documentIds);
|
||||
|
||||
// todo - can't call waitForThingsToLevelOut because it looks for
|
||||
// jettys of all shards
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -39,10 +40,8 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
|||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.CoreAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.client.solrj.response.RequestStatusState;
|
||||
import org.apache.solr.cloud.AbstractDistribZkTestBase;
|
||||
|
@ -75,8 +74,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
|
|||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
|
||||
@LogLevel("org.apache.solr.cloud.api.collections=DEBUG")
|
||||
@Slow
|
||||
@LogLevel("org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.cloud.api.collections=DEBUG;org.apache.solr.cloud.OverseerTaskProcessor=DEBUG;org.apache.solr.util.TestInjection=DEBUG")
|
||||
public class ShardSplitTest extends BasicDistributedZkTest {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
@ -424,10 +423,13 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
|
||||
public void testSplitWithChaosMonkey() throws Exception {
|
||||
waitForThingsToLevelOut(15);
|
||||
|
||||
log.info("Using legacyCloud=false for cluster");
|
||||
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
|
||||
.process(cloudClient);
|
||||
|
||||
List<StoppableIndexingThread> indexers = new ArrayList<>();
|
||||
try {
|
||||
for (int i = 0; i < 1; i++) {
|
||||
|
@ -502,7 +504,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
// indexed are available in SolrCloud and if the split succeeded then all replicas of the sub-shard
|
||||
// must be consistent (i.e. have same numdocs)
|
||||
|
||||
log.info("Shard split request state is COMPLETED");
|
||||
log.info("Shard split request state is {}", splitStatus == null ? "unknown" : splitStatus.getKey());
|
||||
stop.set(true);
|
||||
monkeyThread.join();
|
||||
Set<String> addFails = new HashSet<>();
|
||||
|
@ -518,37 +520,9 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
cloudClient.getZkStateReader().forceUpdateCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||
log.info("Current collection state: {}", printClusterStateInfo(AbstractDistribZkTestBase.DEFAULT_COLLECTION));
|
||||
|
||||
boolean replicaCreationsFailed = false;
|
||||
if (splitStatus == RequestStatusState.FAILED) {
|
||||
// either one or more replica creation failed (because it may have been created on the same parent shard leader node)
|
||||
// or the split may have failed while trying to soft-commit *after* all replicas have been created
|
||||
// the latter counts as a successful switch even if the API doesn't say so
|
||||
// so we must find a way to distinguish between the two
|
||||
// an easy way to do that is to look at the sub-shard replicas and check if the replica core actually exists
|
||||
// instead of existing solely inside the cluster state
|
||||
DocCollection collectionState = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||
Slice slice10 = collectionState.getSlice(SHARD1_0);
|
||||
Slice slice11 = collectionState.getSlice(SHARD1_1);
|
||||
if (slice10 != null && slice11 != null) {
|
||||
for (Replica replica : slice10) {
|
||||
if (!doesReplicaCoreExist(replica)) {
|
||||
replicaCreationsFailed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (Replica replica : slice11) {
|
||||
if (!doesReplicaCoreExist(replica)) {
|
||||
replicaCreationsFailed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// true if sub-shard states switch to 'active' eventually
|
||||
AtomicBoolean areSubShardsActive = new AtomicBoolean(false);
|
||||
|
||||
if (!replicaCreationsFailed) {
|
||||
if (splitStatus == RequestStatusState.COMPLETED) {
|
||||
// all sub-shard replicas were created successfully so all cores must recover eventually
|
||||
waitForRecoveriesToFinish(AbstractDistribZkTestBase.DEFAULT_COLLECTION, true);
|
||||
// let's wait for the overseer to switch shard states
|
||||
|
@ -611,23 +585,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean doesReplicaCoreExist(Replica replica) throws IOException {
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getStr(BASE_URL_PROP))
|
||||
.withHttpClient(cloudClient.getLbClient().getHttpClient()).build()) {
|
||||
String coreName = replica.getCoreName();
|
||||
try {
|
||||
CoreAdminResponse status = CoreAdminRequest.getStatus(coreName, client);
|
||||
if (status.getCoreStatus(coreName) == null || status.getCoreStatus(coreName).size() == 0) {
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Error gettting core status of replica " + replica + ". Perhaps it does not exist!", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitShardWithRule() throws Exception {
|
||||
doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK);
|
||||
|
@ -718,6 +675,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
} else {
|
||||
subRanges = router.partitionRange(2, shard1Range);
|
||||
}
|
||||
final Set<String> documentIds = ConcurrentHashMap.newKeySet(1024);
|
||||
final List<DocRouter.Range> ranges = subRanges;
|
||||
final int[] docCounts = new int[ranges.size()];
|
||||
int numReplicas = shard1.getReplicas().size();
|
||||
|
@ -728,7 +686,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
del("*:*");
|
||||
for (int id = 0; id <= 100; id++) {
|
||||
String shardKey = "" + (char)('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
|
||||
indexAndUpdateCount(router, ranges, docCounts, shardKey + "!" + String.valueOf(id), id);
|
||||
indexAndUpdateCount(router, ranges, docCounts, shardKey + "!" + String.valueOf(id), id, documentIds);
|
||||
}
|
||||
commit();
|
||||
|
||||
|
@ -742,7 +700,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
Set<String> deleted = new HashSet<>();
|
||||
for (int id = 101; id < max; id++) {
|
||||
try {
|
||||
indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id);
|
||||
indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds);
|
||||
Thread.sleep(sleep);
|
||||
if (usually(random)) {
|
||||
String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101);
|
||||
|
@ -750,6 +708,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
try {
|
||||
deleteAndUpdateCount(router, ranges, docCounts, delId);
|
||||
deleted.add(delId);
|
||||
documentIds.remove(String.valueOf(delId));
|
||||
} catch (Exception e) {
|
||||
log.error("Exception while deleting docs", e);
|
||||
}
|
||||
|
@ -790,7 +749,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
}
|
||||
|
||||
waitForRecoveriesToFinish(true);
|
||||
checkDocCountsAndShardStates(docCounts, numReplicas);
|
||||
checkDocCountsAndShardStates(docCounts, numReplicas, documentIds);
|
||||
}
|
||||
|
||||
|
||||
|
@ -957,7 +916,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
}
|
||||
}
|
||||
|
||||
protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas) throws Exception {
|
||||
protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas, Set<String> documentIds) throws Exception {
|
||||
ClusterState clusterState = null;
|
||||
Slice slice1_0 = null, slice1_1 = null;
|
||||
int i = 0;
|
||||
|
@ -1005,7 +964,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
}
|
||||
long shard11Count = response2.getResults().getNumFound();
|
||||
|
||||
logDebugHelp(docCounts, response, shard10Count, response2, shard11Count);
|
||||
logDebugHelp(docCounts, response, shard10Count, response2, shard11Count, documentIds);
|
||||
|
||||
assertEquals("Wrong doc count on shard1_0. See SOLR-5309", docCounts[0], shard10Count);
|
||||
assertEquals("Wrong doc count on shard1_1. See SOLR-5309", docCounts[1], shard11Count);
|
||||
|
@ -1068,12 +1027,13 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
}
|
||||
}
|
||||
|
||||
protected void indexAndUpdateCount(DocRouter router, List<DocRouter.Range> ranges, int[] docCounts, String id, int n) throws Exception {
|
||||
protected void indexAndUpdateCount(DocRouter router, List<DocRouter.Range> ranges, int[] docCounts, String id, int n, Set<String> documentIds) throws Exception {
|
||||
index("id", id, "n_ti", n);
|
||||
|
||||
int idx = getHashRangeIdx(router, ranges, id);
|
||||
if (idx != -1) {
|
||||
docCounts[idx]++;
|
||||
documentIds.add(String.valueOf(id));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1101,12 +1061,14 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
return -1;
|
||||
}
|
||||
|
||||
protected void logDebugHelp(int[] docCounts, QueryResponse response, long shard10Count, QueryResponse response2, long shard11Count) {
|
||||
protected void logDebugHelp(int[] docCounts, QueryResponse response, long shard10Count, QueryResponse response2, long shard11Count, Set<String> documentIds) {
|
||||
for (int i = 0; i < docCounts.length; i++) {
|
||||
int docCount = docCounts[i];
|
||||
log.info("Expected docCount for shard1_{} = {}", i, docCount);
|
||||
}
|
||||
|
||||
Set<String> found = new HashSet<>(1024);
|
||||
|
||||
log.info("Actual docCount for shard1_0 = {}", shard10Count);
|
||||
log.info("Actual docCount for shard1_1 = {}", shard11Count);
|
||||
Map<String, String> idVsVersion = new HashMap<>();
|
||||
|
@ -1119,6 +1081,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
if (old != null) {
|
||||
log.error("EXTRA: ID: " + document.getFieldValue("id") + " on shard1_0. Old version: " + old.getFieldValue("_version_") + " new version: " + document.getFieldValue("_version_"));
|
||||
}
|
||||
found.add(document.getFieldValue("id").toString());
|
||||
}
|
||||
for (int i = 0; i < response2.getResults().size(); i++) {
|
||||
SolrDocument document = response2.getResults().get(i);
|
||||
|
@ -1131,6 +1094,15 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
if (old != null) {
|
||||
log.error("EXTRA: ID: " + document.getFieldValue("id") + " on shard1_1. Old version: " + old.getFieldValue("_version_") + " new version: " + document.getFieldValue("_version_"));
|
||||
}
|
||||
found.add(document.getFieldValue("id").toString());
|
||||
}
|
||||
|
||||
if (found.size() < documentIds.size()) {
|
||||
documentIds.removeAll(found);
|
||||
log.error("MISSING: ID: " + documentIds);
|
||||
} else if (found.size() > documentIds.size()) {
|
||||
found.removeAll(documentIds);
|
||||
log.error("EXTRA: ID: " + found);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue