SOLR-12999: Harden TestReplicationHandlerDiskOverFlow against sporadic timing failures

- ensure IndexFetcher injection is reset in @After method
 - replace System.out with Logger
 - Log and fail on any exceptions in any callbacks/threads
 - use CyclicBarrier (instead of CountdownLatch) to ensure the Query Thread loop doesn't monopolize
   CPU preventing IndexFetcher callback from ever being run

(Some of these improvements directly address jenkins failures we've been seeing)
This commit is contained in:
Chris Hostetter 2019-05-14 13:35:19 -07:00
parent 3764c727e5
commit bf8c6ea435

View File

@ -19,11 +19,13 @@ package org.apache.solr.handler;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
@ -34,6 +36,7 @@ import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.SolrException;
import org.apache.solr.util.LogLevel;
import org.junit.After;
import org.junit.Before;
@ -52,7 +55,9 @@ import static org.apache.solr.handler.TestReplicationHandler.invokeReplicationCo
public class TestReplicationHandlerDiskOverFlow extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String expectedErr = "Search is temporarily disabled";
Function<String, Long> originalDiskSpaceprovider = null;
BooleanSupplier originalTestWait = null;
JettySolrRunner masterJetty, slaveJetty;
SolrClient masterClient, slaveClient;
@ -62,6 +67,9 @@ public class TestReplicationHandlerDiskOverFlow extends SolrTestCaseJ4 {
@Before
public void setUp() throws Exception {
originalDiskSpaceprovider = IndexFetcher.usableDiskSpaceProvider;
originalTestWait = IndexFetcher.testWait;
super.setUp();
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
String factory = random().nextInt(100) < 75 ? "solr.NRTCachingDirectoryFactory" : "solr.StandardDirectoryFactory"; // test the default most of the time
@ -91,83 +99,106 @@ public class TestReplicationHandlerDiskOverFlow extends SolrTestCaseJ4 {
slaveClient.close();
masterClient = slaveClient = null;
System.clearProperty("solr.indexfetcher.sotimeout");
IndexFetcher.usableDiskSpaceProvider = originalDiskSpaceprovider;
IndexFetcher.testWait = originalTestWait;
}
@Test
public void testDiskOverFlow() throws Exception {
invokeReplicationCommand(slaveJetty.getLocalPort(), "disablepoll");
//index docs
System.out.println("MASTER");
log.info("Indexing to MASTER");
int docsInMaster = 1000;
long szMaster = indexDocs(masterClient, docsInMaster, 0);
System.out.println("SLAVE");
log.info("Indexing to SLAVE");
long szSlave = indexDocs(slaveClient, 1200, 1000);
Function<String, Long> originalDiskSpaceprovider = IndexFetcher.usableDiskSpaceProvider;
IndexFetcher.usableDiskSpaceProvider = new Function<String, Long>() {
@Override
public Long apply(String s) {
return szMaster;
}
};
QueryResponse response;
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean searchDisabledFound = new AtomicBoolean(false);
try {
// we don't need/want the barrier to be cyclic, so we use a ref that our barrier action will null
// out to prevent it from being triggered multiple times (which shouldn't happen anyway)
final AtomicReference<CyclicBarrier> commonBarrier = new AtomicReference<>();
commonBarrier.set(new CyclicBarrier(2, () -> { commonBarrier.set(null); }));
final List<Throwable> threadFailures = new ArrayList<>(7);
IndexFetcher.testWait = new BooleanSupplier() {
@Override
public boolean getAsBoolean() {
try {
latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
final CyclicBarrier barrier = commonBarrier.get();
if (null != barrier) {
barrier.await(60, TimeUnit.SECONDS);
}
} catch (Exception e) {
log.error("IndexFetcher Thread Failure", e);
threadFailures.add(e);
}
return true;
}
};
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
for (int i = 0; i < 100; i++) {
final CyclicBarrier barrier = commonBarrier.get();
assertNotNull("why is query thread still looping if barrier has already been cleared?",
barrier);
try {
QueryResponse rsp = slaveClient.query(new SolrQuery()
.setQuery("*:*")
.setRows(0));
Thread.sleep(100);
Thread.sleep(200);
} catch (SolrException e) {
// TODO: SOLR-13469: why is this FORBIDDEN(403) and not SERVICE_UNAVAILABLE(503)
if (e.code() == SolrException.ErrorCode.FORBIDDEN.code
// TODO: // && e.getMessage().contains(expectedErr) // why is this not always set?
) {
log.info("Got expected exception", e);
// now let the barrier complete & clear itself, and we're done
barrier.await(60, TimeUnit.SECONDS);
return; // break out
}
// else...
// not our expected exception, re-throw to fail fast...
throw e;
}
}
// if we made it this far, something is wrong...
throw new RuntimeException("Query thread gave up waiting for expected error: " + expectedErr);
} catch (Exception e) {
if (e.getMessage().contains("Search is temporarily disabled")) {
searchDisabledFound.set(true);
}
latch.countDown();
break;
}
log.error("Query Thread Failure", e);
threadFailures.add(e);
}
}).start();
response = slaveClient.query(new SolrQuery()
QueryResponse response = slaveClient.query(new SolrQuery()
.add("qt", "/replication")
.add("command", CMD_FETCH_INDEX)
.add("wait", "true")
);
assertEquals("Replication command status",
"OK", response._getStr("status", null));
} finally {
IndexFetcher.usableDiskSpaceProvider = originalDiskSpaceprovider;
}
assertTrue(searchDisabledFound.get());
assertEquals("OK", response._getStr("status", null));
// System.out.println("MASTER INDEX: " + szMaster);
// System.out.println("SLAVE INDEX: " + szSlave);
assertEquals("threads encountered failures (see logs for when)",
Collections.EMPTY_LIST, threadFailures);
response = slaveClient.query(new SolrQuery().setQuery("*:*").setRows(0));
assertEquals(docsInMaster, response.getResults().getNumFound());
assertEquals("docs in slave", docsInMaster, response.getResults().getNumFound());
response = slaveClient.query(new SolrQuery()
.add("qt", "/replication")
.add("command", ReplicationHandler.CMD_DETAILS)
);
System.out.println("DETAILS" + Utils.writeJson(response, new StringWriter(), true).toString());
assertEquals("true", response._getStr("details/slave/clearedLocalIndexFirst", null));
log.info("DETAILS" + Utils.writeJson(response, new StringWriter(), true).toString());
assertEquals("slave's clearedLocalIndexFirst (from rep details)",
"true", response._getStr("details/slave/clearedLocalIndexFirst", null));
}
private long indexDocs(SolrClient client, int totalDocs, int start) throws Exception {