mirror of https://github.com/apache/lucene.git
SOLR-9488: Shard split can fail to write commit data on shutdown/restart causing replicas to recover without replicating the index
This commit is contained in:
parent
ce84a8df12
commit
0c5c0df6bc
|
@ -116,6 +116,9 @@ Bug Fixes
|
|||
|
||||
* SOLR-9381: Snitch for freedisk uses '/' instead of 'coreRootDirectory' (Tim Owen, noble)
|
||||
|
||||
* SOLR-9488: Shard split can fail to write commit data on shutdown/restart causing replicas to recover
|
||||
without replicating the index. This can cause data loss. (shalin)
|
||||
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
|
|
@ -308,6 +308,9 @@ public class IndexFetcher {
|
|||
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
|
||||
long latestGeneration = (Long) response.get(GENERATION);
|
||||
|
||||
LOG.info("Master's generation: " + latestGeneration);
|
||||
LOG.info("Master's version: " + latestVersion);
|
||||
|
||||
// TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no side-car indexes)
|
||||
IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
|
||||
if (commit == null) {
|
||||
|
@ -326,6 +329,7 @@ public class IndexFetcher {
|
|||
}
|
||||
}
|
||||
|
||||
LOG.info("Slave's generation: " + commit.getGeneration());
|
||||
|
||||
if (latestVersion == 0L) {
|
||||
if (forceReplication && commit.getGeneration() != 0) {
|
||||
|
@ -353,8 +357,6 @@ public class IndexFetcher {
|
|||
successfulInstall = true;
|
||||
return true;
|
||||
}
|
||||
LOG.info("Master's generation: " + latestGeneration);
|
||||
LOG.info("Slave's generation: " + commit.getGeneration());
|
||||
LOG.info("Starting replication process");
|
||||
// get the list of files first
|
||||
fetchFileList(latestGeneration);
|
||||
|
|
|
@ -519,6 +519,9 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
|
|||
|
||||
@SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
|
||||
" but currently suspiciously used for replication as well")
|
||||
/*
|
||||
Also see SolrIndexSplitter.setCommitData
|
||||
*/
|
||||
private void setCommitData(IndexWriter iw) {
|
||||
final Map<String,String> commitData = new HashMap<>();
|
||||
commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY,
|
||||
|
|
|
@ -19,7 +19,9 @@ package org.apache.solr.update;
|
|||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.index.CodecReader;
|
||||
import org.apache.lucene.index.FilterCodecReader;
|
||||
|
@ -40,6 +42,7 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.apache.solr.common.cloud.CompositeIdRouter;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.HashBasedRouter;
|
||||
import org.apache.solr.common.util.SuppressForbidden;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.schema.SchemaField;
|
||||
import org.apache.solr.search.BitsFilteredPostingsEnum;
|
||||
|
@ -134,6 +137,11 @@ public class SolrIndexSplitter {
|
|||
CodecReader subReader = SlowCodecReaderWrapper.wrap(leaves.get(segmentNumber).reader());
|
||||
iw.addIndexes(new LiveDocsReader(subReader, segmentDocSets.get(segmentNumber)[partitionNumber]));
|
||||
}
|
||||
// we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
|
||||
// because the sub-shard cores will just ignore such a commit because the update log is not
|
||||
// in active state at this time.
|
||||
setCommitData(iw);
|
||||
iw.commit();
|
||||
success = true;
|
||||
} finally {
|
||||
if (iwRef != null) {
|
||||
|
@ -151,7 +159,13 @@ public class SolrIndexSplitter {
|
|||
|
||||
}
|
||||
|
||||
|
||||
@SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
|
||||
" but currently suspiciously used for replication as well")
|
||||
private void setCommitData(IndexWriter iw) {
|
||||
final Map<String,String> commitData = new HashMap<>();
|
||||
commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
|
||||
iw.setLiveCommitData(commitData.entrySet());
|
||||
}
|
||||
|
||||
FixedBitSet[] split(LeafReaderContext readerContext) throws IOException {
|
||||
LeafReader reader = readerContext.reader();
|
||||
|
|
|
@ -25,20 +25,25 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.client.solrj.response.RequestStatusState;
|
||||
import org.apache.solr.common.SolrDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.CollectionStateWatcher;
|
||||
import org.apache.solr.common.cloud.CompositeIdRouter;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
|
@ -56,6 +61,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
|
||||
|
@ -71,6 +77,12 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
schemaString = "schema15.xml"; // we need a string id
|
||||
}
|
||||
|
||||
@Override
|
||||
public void distribSetUp() throws Exception {
|
||||
super.distribSetUp();
|
||||
useFactory(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
|
||||
|
@ -91,6 +103,146 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
//waitForThingsToLevelOut(15);
|
||||
}
|
||||
|
||||
/*
|
||||
Creates a collection with replicationFactor=1, splits a shard. Restarts the sub-shard leader node.
|
||||
Add a replica. Ensure count matches in leader and replica.
|
||||
*/
|
||||
public void testSplitStaticIndexReplication() throws Exception {
|
||||
waitForThingsToLevelOut(15);
|
||||
|
||||
DocCollection defCol = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||
Replica replica = defCol.getReplicas().get(0);
|
||||
String nodeName = replica.getNodeName();
|
||||
|
||||
String collectionName = "testSplitStaticIndexReplication";
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
|
||||
create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
|
||||
create.setCreateNodeSet(nodeName); // we want to create the leader on a fixed node so that we know which one to restart later
|
||||
create.process(cloudClient);
|
||||
try (CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress(), true, cloudClient.getLbClient().getHttpClient())) {
|
||||
client.setDefaultCollection(collectionName);
|
||||
StoppableIndexingThread thread = new StoppableIndexingThread(controlClient, client, "i1", true);
|
||||
try {
|
||||
thread.start();
|
||||
Thread.sleep(1000); // give the indexer sometime to do its work
|
||||
thread.safeStop();
|
||||
thread.join();
|
||||
client.commit();
|
||||
controlClient.commit();
|
||||
|
||||
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
|
||||
splitShard.setShardName(SHARD1);
|
||||
String asyncId = splitShard.processAsync(client);
|
||||
RequestStatusState state = CollectionAdminRequest.requestStatus(asyncId).waitFor(client, 120);
|
||||
if (state == RequestStatusState.COMPLETED) {
|
||||
waitForRecoveriesToFinish(collectionName, true);
|
||||
// let's wait to see parent shard become inactive
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
|
||||
@Override
|
||||
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
|
||||
Slice parent = collectionState.getSlice(SHARD1);
|
||||
Slice slice10 = collectionState.getSlice(SHARD1_0);
|
||||
Slice slice11 = collectionState.getSlice(SHARD1_1);
|
||||
if (slice10 != null && slice11 != null &&
|
||||
parent.getState() == Slice.State.INACTIVE &&
|
||||
slice10.getState() == Slice.State.ACTIVE &&
|
||||
slice11.getState() == Slice.State.ACTIVE) {
|
||||
latch.countDown();
|
||||
return true; // removes the watch
|
||||
}
|
||||
return false;
|
||||
}
|
||||
});
|
||||
latch.await(1, TimeUnit.MINUTES);
|
||||
if (latch.getCount() != 0) {
|
||||
// sanity check
|
||||
fail("Sub-shards did not become active even after waiting for 1 minute");
|
||||
}
|
||||
|
||||
int liveNodeCount = client.getZkStateReader().getClusterState().getLiveNodes().size();
|
||||
|
||||
// restart the sub-shard leader node
|
||||
boolean restarted = false;
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
int port = jetty.getBaseUrl().getPort();
|
||||
if (replica.getStr(BASE_URL_PROP).contains(":" + port)) {
|
||||
ChaosMonkey.kill(jetty);
|
||||
ChaosMonkey.start(jetty);
|
||||
restarted = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!restarted) {
|
||||
// sanity check
|
||||
fail("We could not find a jetty to kill for replica: " + replica.getCoreUrl());
|
||||
}
|
||||
|
||||
// add a new replica for the sub-shard
|
||||
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collectionName, SHARD1_0);
|
||||
// use control client because less chances of it being the node being restarted
|
||||
// this is to avoid flakiness of test because of NoHttpResponseExceptions
|
||||
String control_collection = client.getZkStateReader().getClusterState().getCollection("control_collection").getReplicas().get(0).getStr(BASE_URL_PROP);
|
||||
try (HttpSolrClient control = new HttpSolrClient.Builder(control_collection).withHttpClient(client.getLbClient().getHttpClient()).build()) {
|
||||
state = addReplica.processAndWait(control, 30);
|
||||
}
|
||||
if (state == RequestStatusState.COMPLETED) {
|
||||
CountDownLatch newReplicaLatch = new CountDownLatch(1);
|
||||
client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
|
||||
@Override
|
||||
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
|
||||
if (liveNodes.size() != liveNodeCount) {
|
||||
return false;
|
||||
}
|
||||
Slice slice = collectionState.getSlice(SHARD1_0);
|
||||
if (slice.getReplicas().size() == 2) {
|
||||
if (!slice.getReplicas().stream().anyMatch(r -> r.getState() == Replica.State.RECOVERING)) {
|
||||
// we see replicas and none of them are recovering
|
||||
newReplicaLatch.countDown();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
});
|
||||
newReplicaLatch.await(30, TimeUnit.SECONDS);
|
||||
// check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't
|
||||
// handle new shards/replica so well.
|
||||
ClusterState clusterState = client.getZkStateReader().getClusterState();
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
int numReplicasChecked = assertConsistentReplicas(collection.getSlice(SHARD1_0));
|
||||
assertEquals("We should have checked consistency for exactly 2 replicas of shard1_0", 2, numReplicasChecked);
|
||||
} else {
|
||||
fail("Adding a replica to sub-shard did not complete even after waiting for 30 seconds!. Saw state = " + state.getKey());
|
||||
}
|
||||
} else {
|
||||
fail("We expected shard split to succeed on a static index but it didn't. Found state = " + state.getKey());
|
||||
}
|
||||
} finally {
|
||||
thread.safeStop();
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int assertConsistentReplicas(Slice shard) throws SolrServerException, IOException {
|
||||
long numFound = Long.MIN_VALUE;
|
||||
int count = 0;
|
||||
for (Replica replica : shard.getReplicas()) {
|
||||
HttpSolrClient client = new HttpSolrClient.Builder(replica.getCoreUrl())
|
||||
.withHttpClient(cloudClient.getLbClient().getHttpClient()).build();
|
||||
QueryResponse response = client.query(new SolrQuery("q", "*:*", "distrib", "false"));
|
||||
log.info("Found numFound={} on replica: {}", response.getResults().getNumFound(), replica.getCoreUrl());
|
||||
if (numFound == Long.MIN_VALUE) {
|
||||
numFound = response.getResults().getNumFound();
|
||||
} else {
|
||||
assertEquals("Shard " + shard.getName() + " replicas do not have same number of documents", numFound, response.getResults().getNumFound());
|
||||
}
|
||||
count++;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to test that we can split a shard when a previous split event
|
||||
* left sub-shards in construction or recovery state.
|
||||
|
|
Loading…
Reference in New Issue