SOLR-13815: enhance live split test to fail more often

This commit is contained in:
yonik 2019-10-13 16:45:32 -04:00 committed by Yonik Seeley
parent 509b74fa4e
commit 1d43bda284
1 changed files with 56 additions and 37 deletions

View File

@ -20,6 +20,9 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -31,6 +34,8 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
@ -194,39 +199,46 @@ public class SplitShardTest extends SolrCloudTestCase {
return totCount;
}
void doLiveSplitShard(String collectionName, int repFactor) throws Exception {
void doLiveSplitShard(String collectionName, int repFactor, int nThreads) throws Exception {
final CloudSolrClient client = createCollection(collectionName, repFactor);
final ConcurrentHashMap<String,Long> model = new ConcurrentHashMap<>(); // what the index should contain
final AtomicBoolean doIndex = new AtomicBoolean(true);
final AtomicInteger docsIndexed = new AtomicInteger();
Thread indexThread = null;
Thread[] indexThreads = new Thread[nThreads];
try {
// start indexing client before we initiate a shard split
indexThread = new Thread(() -> {
while (doIndex.get()) {
try {
// Thread.sleep(10); // uncomment this to cap indexing rate at 100 docs per second...
int currDoc = docsIndexed.get();
// Try all docs in the same update request
UpdateRequest updateReq = new UpdateRequest();
updateReq.add(sdoc("id", "doc_" + currDoc));
UpdateResponse ursp = updateReq.commit(client, collectionName);
assertEquals(0, ursp.getStatus()); // for now, don't accept any failures
if (ursp.getStatus() == 0) {
docsIndexed.incrementAndGet();
for (int i=0; i<nThreads; i++) {
indexThreads[i] = new Thread(() -> {
while (doIndex.get()) {
try {
// Thread.sleep(10); // cap indexing rate at 100 docs per second per thread
int currDoc = docsIndexed.incrementAndGet();
String docId = "doc_" + currDoc;
// Try all docs in the same update request
UpdateRequest updateReq = new UpdateRequest();
updateReq.add(sdoc("id", docId));
// UpdateResponse ursp = updateReq.commit(client, collectionName); // uncomment this if you want a commit each time
UpdateResponse ursp = updateReq.process(client, collectionName);
assertEquals(0, ursp.getStatus()); // for now, don't accept any failures
if (ursp.getStatus() == 0) {
model.put(docId, 1L); // in the future, keep track of a version per document and reuse ids to keep index from growing too large
}
} catch (Exception e) {
fail(e.getMessage());
break;
}
} catch (Exception e) {
fail(e.getMessage());
break;
}
}
});
indexThread.start();
});
}
for (Thread thread : indexThreads) {
thread.start();
}
Thread.sleep(100); // wait for a few docs to be indexed before invoking split
int docCount = docsIndexed.get();
int docCount = model.size();
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
.setShardName("shard1");
@ -235,39 +247,46 @@ public class SplitShardTest extends SolrCloudTestCase {
collectionName, activeClusterShape(2, 3*repFactor)); // 2 repFactor for the new split shards, 1 repFactor for old replicas
// make sure that docs were able to be indexed during the split
assertTrue(docsIndexed.get() > docCount);
assertTrue(model.size() > docCount);
Thread.sleep(100); // wait for a few more docs to be indexed after split
} finally {
// shut down the indexer
// shut down the indexers
doIndex.set(false);
if (indexThread != null) {
indexThread.join();
for (Thread thread : indexThreads) {
thread.join();
}
}
assertTrue(docsIndexed.get() > 0);
client.commit(); // final commit is needed for visibility
long numDocs = getNumDocs(client);
if (numDocs != docsIndexed.get()) {
// Find out what docs are missing.
for (int i = 0; i < docsIndexed.get(); i++) {
String id = "doc_" + i;
long cloudClientDocs = client.query(new SolrQuery("id:" + id)).getResults().getNumFound();
if (cloudClientDocs != 1) {
log.error("MISSING DOCUMENT " + id);
}
if (numDocs != model.size()) {
SolrDocumentList results = client.query(new SolrQuery("q","*:*", "fl","id", "rows", Integer.toString(model.size()) )).getResults();
Map<String,Long> leftover = new HashMap<>(model);
for (SolrDocument doc : results) {
String id = (String) doc.get("id");
leftover.remove(id);
}
log.error("MISSING DOCUMENTS: " + leftover);
}
assertEquals("Documents are missing!", docsIndexed.get(), numDocs);
log.info("Number of documents indexed and queried : " + numDocs);
}
@Test
public void testLiveSplit() throws Exception {
doLiveSplitShard("livesplit1", 1);
// Debugging tips: if this fails, it may be easier to debug by lowering the number fo threads to 1 and looping the test
// until you get another failure.
// You may need to further instrument things like DistributedZkUpdateProcessor to display the cluster state for the collection, etc.
// Using more threads increases the chance to hit a concurrency bug, but too many threads can overwhelm single-threaded buffering
// replay after the low level index split and result in subShard leaders that can't catch up and
// become active (a known issue that still needs to be resolved.)
doLiveSplitShard("livesplit1", 1, 4);
}