SOLR-7291: Test indexing on ZK disconnect with ChaosMonkey tests

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1669026 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ramkumar Aiyengar 2015-03-25 00:03:33 +00:00
parent f678ac5d9d
commit 83c0c952b6
10 changed files with 164 additions and 121 deletions

View File

@ -387,6 +387,8 @@ Other Changes
* SOLR-6673: MDC based logging of collection, shard, replica, core
(Ishan Chattopadhyaya , Noble Paul)
* SOLR-7291: Test indexing on ZK disconnect with ChaosMonkey tests (Ramkumar Aiyengar)
================== 5.0.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -115,11 +115,15 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
numShards = sliceCount + random().nextInt(TEST_NIGHTLY ? 12 : 2) + 1;
}
fixShardCount(numShards);
// None of the operations used here are particularly costly, so this should work.
// Using this low timeout will also help us catch index stalling.
clientSoTimeout = 5000;
}
@Test
public void test() throws Exception {
boolean testsSuccesful = false;
boolean testSuccessful = false;
try {
handle.clear();
handle.put("timestamp", SKIPVAL);
@ -135,12 +139,12 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
// as it's not supported for recovery
del("*:*");
List<StopableThread> threads = new ArrayList<>();
List<StopableIndexingThread> indexTreads = new ArrayList<>();
List<StoppableThread> threads = new ArrayList<>();
List<StoppableIndexingThread> indexTreads = new ArrayList<>();
int threadCount = TEST_NIGHTLY ? 3 : 1;
int i = 0;
for (i = 0; i < threadCount; i++) {
StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true);
StoppableIndexingThread indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), true);
threads.add(indexThread);
indexTreads.add(indexThread);
indexThread.start();
@ -149,7 +153,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
threadCount = 1;
i = 0;
for (i = 0; i < threadCount; i++) {
StopableSearchThread searchThread = new StopableSearchThread();
StoppableSearchThread searchThread = new StoppableSearchThread(cloudClient);
threads.add(searchThread);
searchThread.start();
}
@ -158,7 +162,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
// it's currently hard to know what requests failed when using ConcurrentSolrUpdateServer
boolean runFullThrottle = random().nextBoolean();
if (runFullThrottle) {
FullThrottleStopableIndexingThread ftIndexThread = new FullThrottleStopableIndexingThread(
FullThrottleStoppableIndexingThread ftIndexThread = new FullThrottleStoppableIndexingThread(
clients, "ft1", true);
threads.add(ftIndexThread);
ftIndexThread.start();
@ -185,14 +189,17 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
chaosMonkey.stopTheMonkey();
}
for (StopableThread indexThread : threads) {
// ideally this should go into chaosMonkey
restartZk(1000 * (5 + random().nextInt(4)));
for (StoppableThread indexThread : threads) {
indexThread.safeStop();
}
// start any downed jetties to be sure we still will end up with a leader per shard...
// wait for stop...
for (StopableThread indexThread : threads) {
for (StoppableThread indexThread : threads) {
indexThread.join();
}
@ -217,9 +224,11 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
// we expect full throttle fails, but cloud client should not easily fail
for (StopableThread indexThread : threads) {
if (indexThread instanceof StopableIndexingThread && !(indexThread instanceof FullThrottleStopableIndexingThread)) {
assertFalse("There were too many update fails - we expect it can happen, but shouldn't easily", ((StopableIndexingThread) indexThread).getFailCount() > FAIL_TOLERANCE);
for (StoppableThread indexThread : threads) {
if (indexThread instanceof StoppableIndexingThread && !(indexThread instanceof FullThrottleStoppableIndexingThread)) {
int failCount = ((StoppableIndexingThread) indexThread).getFailCount();
assertFalse("There were too many update fails (" + failCount + " > " + FAIL_TOLERANCE
+ ") - we expect it can happen, but shouldn't easily", failCount > FAIL_TOLERANCE);
}
}
@ -247,12 +256,9 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
// sometimes we restart zookeeper as well
if (random().nextBoolean()) {
zkServer.shutdown();
zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
zkServer.run();
restartZk(1000 * (5 + random().nextInt(4)));
}
try (CloudSolrClient client = createCloudClient("collection1")) {
createCollection(null, "testcollection",
1, 1, 1, client, null, "conf1");
@ -263,31 +269,31 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
numShardsNumReplicas.add(1);
checkForCollection("testcollection", numShardsNumReplicas, null);
testsSuccesful = true;
testSuccessful = true;
} finally {
if (!testsSuccesful) {
if (!testSuccessful) {
printLayout();
}
}
}
private Set<String> getAddFails(List<StopableIndexingThread> threads) {
private Set<String> getAddFails(List<StoppableIndexingThread> threads) {
Set<String> addFails = new HashSet<String>();
for (StopableIndexingThread thread : threads) {
for (StoppableIndexingThread thread : threads) {
addFails.addAll(thread.getAddFails());
}
return addFails;
}
private Set<String> getDeleteFails(List<StopableIndexingThread> threads) {
private Set<String> getDeleteFails(List<StoppableIndexingThread> threads) {
Set<String> deleteFails = new HashSet<String>();
for (StopableIndexingThread thread : threads) {
for (StoppableIndexingThread thread : threads) {
deleteFails.addAll(thread.getDeleteFails());
}
return deleteFails;
}
class FullThrottleStopableIndexingThread extends StopableIndexingThread {
class FullThrottleStoppableIndexingThread extends StoppableIndexingThread {
private CloseableHttpClient httpClient = HttpClientUtil.createClient(null);
private volatile boolean stop = false;
int clientIndex = 0;
@ -295,14 +301,14 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
private List<SolrClient> clients;
private AtomicInteger fails = new AtomicInteger();
public FullThrottleStopableIndexingThread(List<SolrClient> clients,
public FullThrottleStoppableIndexingThread(List<SolrClient> clients,
String id, boolean doDeletes) {
super(controlClient, cloudClient, id, doDeletes);
setName("FullThrottleStopableIndexingThread");
setDaemon(true);
this.clients = clients;
HttpClientUtil.setConnectionTimeout(httpClient, 15000);
HttpClientUtil.setSoTimeout(httpClient, 15000);
HttpClientUtil.setConnectionTimeout(httpClient, clientConnectionTimeout);
HttpClientUtil.setSoTimeout(httpClient, clientSoTimeout);
cusc = new ConcurrentUpdateSolrClient(
((HttpSolrClient) clients.get(0)).getBaseURL(), httpClient, 8,
2) {

View File

@ -110,7 +110,7 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
tryDelete();
List<StopableIndexingThread> threads = new ArrayList<>();
List<StoppableIndexingThread> threads = new ArrayList<>();
int threadCount = 2;
int batchSize = 1;
if (random().nextBoolean()) {
@ -126,7 +126,7 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
}
for (int i = 0; i < threadCount; i++) {
StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true, maxUpdates, batchSize, pauseBetweenUpdates); // random().nextInt(999) + 1
StoppableIndexingThread indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), true, maxUpdates, batchSize, pauseBetweenUpdates); // random().nextInt(999) + 1
threads.add(indexThread);
indexThread.start();
}
@ -152,16 +152,16 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
chaosMonkey.stopTheMonkey();
}
for (StopableIndexingThread indexThread : threads) {
for (StoppableIndexingThread indexThread : threads) {
indexThread.safeStop();
}
// wait for stop...
for (StopableIndexingThread indexThread : threads) {
for (StoppableIndexingThread indexThread : threads) {
indexThread.join();
}
for (StopableIndexingThread indexThread : threads) {
for (StoppableIndexingThread indexThread : threads) {
assertEquals(0, indexThread.getFailCount());
}

View File

@ -34,8 +34,8 @@ public class RecoveryZkTest extends AbstractFullDistribZkTestBase {
//private static final String DISTRIB_UPDATE_CHAIN = "distrib-update-chain";
private static Logger log = LoggerFactory.getLogger(RecoveryZkTest.class);
private StopableIndexingThread indexThread;
private StopableIndexingThread indexThread2;
private StoppableIndexingThread indexThread;
private StoppableIndexingThread indexThread2;
public RecoveryZkTest() {
super();
@ -72,10 +72,10 @@ public class RecoveryZkTest extends AbstractFullDistribZkTestBase {
maxDoc = maxDocNightlyList[random().nextInt(maxDocList.length - 1)];
}
indexThread = new StopableIndexingThread(controlClient, cloudClient, "1", true, maxDoc, 1, true);
indexThread = new StoppableIndexingThread(controlClient, cloudClient, "1", true, maxDoc, 1, true);
indexThread.start();
indexThread2 = new StopableIndexingThread(controlClient, cloudClient, "2", true, maxDoc, 1, true);
indexThread2 = new StoppableIndexingThread(controlClient, cloudClient, "2", true, maxDoc, 1, true);
indexThread2.start();

View File

@ -29,7 +29,7 @@ import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.cloud.BasicDistributedZkTest;
import org.apache.solr.cloud.StopableIndexingThread;
import org.apache.solr.cloud.StoppableIndexingThread;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.HdfsDirectoryFactory;
import org.apache.solr.core.SolrCore;
@ -100,18 +100,18 @@ public class HdfsWriteToMultipleCollectionsTest extends BasicDistributedZkTest {
waitForRecoveriesToFinish(ACOLLECTION + i, false);
}
List<CloudSolrClient> cloudClients = new ArrayList<>();
List<StopableIndexingThread> threads = new ArrayList<>();
List<StoppableIndexingThread> threads = new ArrayList<>();
for (int i = 0; i < cnt; i++) {
CloudSolrClient client = new CloudSolrClient(zkServer.getZkAddress());
client.setDefaultCollection(ACOLLECTION + i);
cloudClients.add(client);
StopableIndexingThread indexThread = new StopableIndexingThread(null, client, "1", true, docCount, 1, true);
StoppableIndexingThread indexThread = new StoppableIndexingThread(null, client, "1", true, docCount, 1, true);
threads.add(indexThread);
indexThread.start();
}
int addCnt = 0;
for (StopableIndexingThread thread : threads) {
for (StoppableIndexingThread thread : threads) {
thread.join();
addCnt += thread.getNumAdds() - thread.getNumDeletes();
}

View File

@ -230,6 +230,8 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
protected boolean verifyStress = true;
protected int nThreads = 3;
protected int clientConnectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
protected int clientSoTimeout = 90000;
public static int ORDERED = 1;
public static int SKIP = 2;
@ -435,8 +437,8 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
try {
// setup the client...
HttpSolrClient client = new HttpSolrClient(buildUrl(port) + "/" + DEFAULT_TEST_CORENAME);
client.setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT);
client.setSoTimeout(90000);
client.setConnectionTimeout(clientConnectionTimeout);
client.setSoTimeout(clientSoTimeout);
client.setDefaultMaxConnectionsPerHost(100);
client.setMaxTotalConnections(100);
return client;

View File

@ -232,4 +232,13 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
zkClient.printLayoutToStdOut();
zkClient.close();
}
protected void restartZk(int pauseMillis) throws Exception {
log.info("Restarting ZK with a pause of {}ms in between", pauseMillis);
zkServer.shutdown();
// disconnect enough to test stalling, if things stall, then clientSoTimeout w""ill be hit
Thread.sleep(pauseMillis);
zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
zkServer.run();
}
}

View File

@ -77,7 +77,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -128,7 +127,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
protected boolean checkCreatedVsState;
protected boolean useJettyDataDir = true;
protected Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
protected Map<URI,SocketProxy> proxies = new HashMap<>();
public static class CloudJettyRunner {
public JettySolrRunner jetty;
@ -207,14 +206,11 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
if (isSSLMode()) {
System.clearProperty("urlScheme");
ZkStateReader zkStateReader = new ZkStateReader(zkServer.getZkAddress(),
AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT);
try {
try (ZkStateReader zkStateReader = new ZkStateReader(zkServer.getZkAddress(),
AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT)) {
zkStateReader.getZkClient().create(ZkStateReader.CLUSTER_PROPS,
ZkStateReader.toJSON(Collections.singletonMap("urlScheme","https")),
ZkStateReader.toJSON(Collections.singletonMap("urlScheme", "https")),
CreateMode.PERSISTENT, true);
} finally {
zkStateReader.close();
}
}
}
@ -1386,16 +1382,13 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
.getNumFound();
// do some really inefficient mapping...
ZkStateReader zk = new ZkStateReader(zkServer.getZkAddress(), 10000,
AbstractZkTestCase.TIMEOUT);
Map<String,Slice> slices = null;
ClusterState clusterState;
try {
try (ZkStateReader zk = new ZkStateReader(zkServer.getZkAddress(), 10000,
AbstractZkTestCase.TIMEOUT)) {
zk.createClusterStateWatchersAndUpdate();
clusterState = zk.getClusterState();
slices = clusterState.getSlicesMap(DEFAULT_COLLECTION);
} finally {
zk.close();
}
if (slices == null) {
@ -1445,67 +1438,16 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
if (r.nextBoolean()) params.set("collection", DEFAULT_COLLECTION);
QueryResponse rsp = cloudClient.query(params);
return rsp;
return cloudClient.query(params);
}
static abstract class StopableThread extends Thread {
public StopableThread(String name) {
static abstract class StoppableThread extends Thread {
public StoppableThread(String name) {
super(name);
}
public abstract void safeStop();
}
class StopableSearchThread extends StopableThread {
private volatile boolean stop = false;
protected final AtomicInteger queryFails = new AtomicInteger();
private String[] QUERIES = new String[] {"to come","their country","aid","co*"};
public StopableSearchThread() {
super("StopableSearchThread");
setDaemon(true);
}
@Override
public void run() {
Random random = random();
int numSearches = 0;
while (true && !stop) {
numSearches++;
try {
//to come to the aid of their country.
cloudClient.query(new SolrQuery(QUERIES[random.nextInt(QUERIES.length)]));
} catch (Exception e) {
System.err.println("QUERY REQUEST FAILED:");
e.printStackTrace();
if (e instanceof SolrServerException) {
System.err.println("ROOT CAUSE:");
((SolrServerException) e).getRootCause().printStackTrace();
}
queryFails.incrementAndGet();
}
try {
Thread.sleep(random.nextInt(4000) + 300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
log.info("num searches done:" + numSearches + " with " + queryFails + " fails");
}
@Override
public void safeStop() {
stop = true;
}
public int getFails() {
return queryFails.get();
}
};
public void waitForThingsToLevelOut(int waitForRecTimeSeconds) throws Exception {
log.info("Wait for recoveries to finish - wait " + waitForRecTimeSeconds + " for each attempt");
int cnt = 0;
@ -1934,14 +1876,14 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
REPLICATION_FACTOR, replicationFactor,
MAX_SHARDS_PER_NODE, maxShardsPerNode,
NUM_SLICES, numShards);
Map<String,List<Integer>> collectionInfos = new HashMap<String,List<Integer>>();
Map<String,List<Integer>> collectionInfos = new HashMap<>();
createCollection(collectionInfos, collName, props, client);
}
protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, String shardId, int shards, int rf, int maxWaitSecs) throws Exception {
long startMs = System.currentTimeMillis();
Map<String,Replica> notLeaders = new HashMap<String,Replica>();
Map<String,Replica> notLeaders = new HashMap<>();
ZkStateReader zkr = cloudClient.getZkStateReader();
zkr.updateClusterState(true); // force the state to be fresh
@ -2001,7 +1943,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
long diffMs = (System.currentTimeMillis() - startMs);
log.info("Took " + diffMs + " ms to see all replicas become active.");
List<Replica> replicas = new ArrayList<Replica>();
List<Replica> replicas = new ArrayList<>();
replicas.addAll(notLeaders.values());
return replicas;
}
@ -2017,7 +1959,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
if (collection != null) {
cs = clusterState.getCollection(collection).toString();
} else {
Map<String,DocCollection> map = new HashMap<String,DocCollection>();
Map<String,DocCollection> map = new HashMap<>();
for (String coll : clusterState.getCollections())
map.put(coll, clusterState.getCollection(coll));
CharArr out = new CharArr();

View File

@ -28,7 +28,7 @@ import org.apache.solr.common.SolrInputDocument;
* limitations under the License.
*/
public class StopableIndexingThread extends AbstractFullDistribZkTestBase.StopableThread {
public class StoppableIndexingThread extends AbstractFullDistribZkTestBase.StoppableThread {
static String t1 = "a_t";
static String i1 = "a_i";
private volatile boolean stop = false;
@ -46,12 +46,12 @@ public class StopableIndexingThread extends AbstractFullDistribZkTestBase.Stopab
private int batchSize;
private boolean pauseBetweenUpdates;
public StopableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes) {
public StoppableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes) {
this(controlClient, cloudClient, id, doDeletes, -1, 1, true);
}
public StopableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes, int numCycles, int batchSize, boolean pauseBetweenUpdates) {
super("StopableIndexingThread");
public StoppableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes, int numCycles, int batchSize, boolean pauseBetweenUpdates) {
super("StoppableIndexingThread");
this.controlClient = controlClient;
this.cloudClient = cloudClient;
this.id = id;

View File

@ -0,0 +1,82 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class StoppableSearchThread extends AbstractFullDistribZkTestBase.StoppableThread {
static Logger log = LoggerFactory.getLogger(AbstractFullDistribZkTestBase.class);
private final CloudSolrClient cloudClient;
private volatile boolean stop = false;
protected final AtomicInteger queryFails = new AtomicInteger();
private String[] QUERIES = new String[] {"to come","their country","aid","co*"};
public StoppableSearchThread(CloudSolrClient cloudClient) {
super("StoppableSearchThread");
this.cloudClient = cloudClient;
setDaemon(true);
}
@Override
public void run() {
Random random = LuceneTestCase.random();
int numSearches = 0;
while (true && !stop) {
numSearches++;
try {
//to come to the aid of their country.
cloudClient.query(new SolrQuery(QUERIES[random.nextInt(QUERIES.length)]));
} catch (Exception e) {
System.err.println("QUERY REQUEST FAILED:");
e.printStackTrace();
if (e instanceof SolrServerException) {
System.err.println("ROOT CAUSE:");
((SolrServerException) e).getRootCause().printStackTrace();
}
queryFails.incrementAndGet();
}
try {
Thread.sleep(random.nextInt(4000) + 300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
log.info("num searches done:" + numSearches + " with " + queryFails + " fails");
}
@Override
public void safeStop() {
stop = true;
}
public int getFails() {
return queryFails.get();
}
}