SOLR-9488: Shard split can fail to write commit data on shutdown/restart causing replicas to recover without replicating the index

This commit is contained in:
Shalin Shekhar Mangar 2016-09-07 21:06:50 +05:30
parent ce84a8df12
commit 0c5c0df6bc
5 changed files with 177 additions and 3 deletions

View File

@ -116,6 +116,9 @@ Bug Fixes
* SOLR-9381: Snitch for freedisk uses '/' instead of 'coreRootDirectory' (Tim Owen, noble)
* SOLR-9488: Shard split can fail to write commit data on shutdown/restart causing replicas to recover
without replicating the index. This can cause data loss. (shalin)
Optimizations
----------------------

View File

@ -308,6 +308,9 @@ public class IndexFetcher {
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
long latestGeneration = (Long) response.get(GENERATION);
LOG.info("Master's generation: " + latestGeneration);
LOG.info("Master's version: " + latestVersion);
// TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no side-car indexes)
IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
if (commit == null) {
@ -326,6 +329,7 @@ public class IndexFetcher {
}
}
LOG.info("Slave's generation: " + commit.getGeneration());
if (latestVersion == 0L) {
if (forceReplication && commit.getGeneration() != 0) {
@ -353,8 +357,6 @@ public class IndexFetcher {
successfulInstall = true;
return true;
}
LOG.info("Master's generation: " + latestGeneration);
LOG.info("Slave's generation: " + commit.getGeneration());
LOG.info("Starting replication process");
// get the list of files first
fetchFileList(latestGeneration);

View File

@ -519,6 +519,9 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
@SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
" but currently suspiciously used for replication as well")
/*
Also see SolrIndexSplitter.setCommitData
*/
private void setCommitData(IndexWriter iw) {
final Map<String,String> commitData = new HashMap<>();
commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY,

View File

@ -19,7 +19,9 @@ package org.apache.solr.update;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.FilterCodecReader;
@ -40,6 +42,7 @@ import org.apache.lucene.util.IOUtils;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.HashBasedRouter;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.BitsFilteredPostingsEnum;
@ -134,6 +137,11 @@ public class SolrIndexSplitter {
CodecReader subReader = SlowCodecReaderWrapper.wrap(leaves.get(segmentNumber).reader());
iw.addIndexes(new LiveDocsReader(subReader, segmentDocSets.get(segmentNumber)[partitionNumber]));
}
// we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
// because the sub-shard cores will just ignore such a commit because the update log is not
// in active state at this time.
setCommitData(iw);
iw.commit();
success = true;
} finally {
if (iwRef != null) {
@ -151,7 +159,13 @@ public class SolrIndexSplitter {
}
@SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
" but currently suspiciously used for replication as well")
private void setCommitData(IndexWriter iw) {
final Map<String,String> commitData = new HashMap<>();
commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
iw.setLiveCommitData(commitData.entrySet());
}
FixedBitSet[] split(LeafReaderContext readerContext) throws IOException {
LeafReader reader = readerContext.reader();

View File

@ -25,20 +25,25 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
@ -56,6 +61,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
@ -71,6 +77,12 @@ public class ShardSplitTest extends BasicDistributedZkTest {
schemaString = "schema15.xml"; // we need a string id
}
@Override
public void distribSetUp() throws Exception {
super.distribSetUp();
useFactory(null);
}
@Test
public void test() throws Exception {
@ -91,6 +103,146 @@ public class ShardSplitTest extends BasicDistributedZkTest {
//waitForThingsToLevelOut(15);
}
/*
Creates a collection with replicationFactor=1, splits a shard. Restarts the sub-shard leader node.
Add a replica. Ensure count matches in leader and replica.
*/
public void testSplitStaticIndexReplication() throws Exception {
waitForThingsToLevelOut(15);
DocCollection defCol = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
Replica replica = defCol.getReplicas().get(0);
String nodeName = replica.getNodeName();
String collectionName = "testSplitStaticIndexReplication";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
create.setCreateNodeSet(nodeName); // we want to create the leader on a fixed node so that we know which one to restart later
create.process(cloudClient);
try (CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress(), true, cloudClient.getLbClient().getHttpClient())) {
client.setDefaultCollection(collectionName);
StoppableIndexingThread thread = new StoppableIndexingThread(controlClient, client, "i1", true);
try {
thread.start();
Thread.sleep(1000); // give the indexer sometime to do its work
thread.safeStop();
thread.join();
client.commit();
controlClient.commit();
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
splitShard.setShardName(SHARD1);
String asyncId = splitShard.processAsync(client);
RequestStatusState state = CollectionAdminRequest.requestStatus(asyncId).waitFor(client, 120);
if (state == RequestStatusState.COMPLETED) {
waitForRecoveriesToFinish(collectionName, true);
// let's wait to see parent shard become inactive
CountDownLatch latch = new CountDownLatch(1);
client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
@Override
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
Slice parent = collectionState.getSlice(SHARD1);
Slice slice10 = collectionState.getSlice(SHARD1_0);
Slice slice11 = collectionState.getSlice(SHARD1_1);
if (slice10 != null && slice11 != null &&
parent.getState() == Slice.State.INACTIVE &&
slice10.getState() == Slice.State.ACTIVE &&
slice11.getState() == Slice.State.ACTIVE) {
latch.countDown();
return true; // removes the watch
}
return false;
}
});
latch.await(1, TimeUnit.MINUTES);
if (latch.getCount() != 0) {
// sanity check
fail("Sub-shards did not become active even after waiting for 1 minute");
}
int liveNodeCount = client.getZkStateReader().getClusterState().getLiveNodes().size();
// restart the sub-shard leader node
boolean restarted = false;
for (JettySolrRunner jetty : jettys) {
int port = jetty.getBaseUrl().getPort();
if (replica.getStr(BASE_URL_PROP).contains(":" + port)) {
ChaosMonkey.kill(jetty);
ChaosMonkey.start(jetty);
restarted = true;
break;
}
}
if (!restarted) {
// sanity check
fail("We could not find a jetty to kill for replica: " + replica.getCoreUrl());
}
// add a new replica for the sub-shard
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collectionName, SHARD1_0);
// use control client because less chances of it being the node being restarted
// this is to avoid flakiness of test because of NoHttpResponseExceptions
String control_collection = client.getZkStateReader().getClusterState().getCollection("control_collection").getReplicas().get(0).getStr(BASE_URL_PROP);
try (HttpSolrClient control = new HttpSolrClient.Builder(control_collection).withHttpClient(client.getLbClient().getHttpClient()).build()) {
state = addReplica.processAndWait(control, 30);
}
if (state == RequestStatusState.COMPLETED) {
CountDownLatch newReplicaLatch = new CountDownLatch(1);
client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
@Override
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
if (liveNodes.size() != liveNodeCount) {
return false;
}
Slice slice = collectionState.getSlice(SHARD1_0);
if (slice.getReplicas().size() == 2) {
if (!slice.getReplicas().stream().anyMatch(r -> r.getState() == Replica.State.RECOVERING)) {
// we see replicas and none of them are recovering
newReplicaLatch.countDown();
return true;
}
}
return false;
}
});
newReplicaLatch.await(30, TimeUnit.SECONDS);
// check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't
// handle new shards/replica so well.
ClusterState clusterState = client.getZkStateReader().getClusterState();
DocCollection collection = clusterState.getCollection(collectionName);
int numReplicasChecked = assertConsistentReplicas(collection.getSlice(SHARD1_0));
assertEquals("We should have checked consistency for exactly 2 replicas of shard1_0", 2, numReplicasChecked);
} else {
fail("Adding a replica to sub-shard did not complete even after waiting for 30 seconds!. Saw state = " + state.getKey());
}
} else {
fail("We expected shard split to succeed on a static index but it didn't. Found state = " + state.getKey());
}
} finally {
thread.safeStop();
thread.join();
}
}
}
private int assertConsistentReplicas(Slice shard) throws SolrServerException, IOException {
long numFound = Long.MIN_VALUE;
int count = 0;
for (Replica replica : shard.getReplicas()) {
HttpSolrClient client = new HttpSolrClient.Builder(replica.getCoreUrl())
.withHttpClient(cloudClient.getLbClient().getHttpClient()).build();
QueryResponse response = client.query(new SolrQuery("q", "*:*", "distrib", "false"));
log.info("Found numFound={} on replica: {}", response.getResults().getNumFound(), replica.getCoreUrl());
if (numFound == Long.MIN_VALUE) {
numFound = response.getResults().getNumFound();
} else {
assertEquals("Shard " + shard.getName() + " replicas do not have same number of documents", numFound, response.getResults().getNumFound());
}
count++;
}
return count;
}
/**
* Used to test that we can split a shard when a previous split event
* left sub-shards in construction or recovery state.