solrcloud: improve some logging, improve some testing, other minor tweaks

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1372981 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-08-14 17:16:43 +00:00
parent 2573a23996
commit c607f548bb
13 changed files with 101 additions and 35 deletions

View File

@ -19,6 +19,9 @@ package org.apache.solr.client.solrj.embedded;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import javax.servlet.DispatcherType;
@ -27,7 +30,8 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.eclipse.jetty.server.*;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.bio.SocketConnector;
import org.eclipse.jetty.server.handler.GzipHandler;
import org.eclipse.jetty.server.session.HashSessionIdManager;
@ -43,6 +47,8 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
* @since solr 1.3
*/
public class JettySolrRunner {
static Map<JettySolrRunner,Exception> RUNNING_JETTIES = new HashMap<JettySolrRunner,Exception>();
Server server;
FilterHolder dispatchFilter;
@ -202,6 +208,7 @@ public class JettySolrRunner {
if (!server.isRunning()) {
server.start();
RUNNING_JETTIES.put(this, new RuntimeException());
}
synchronized (JettySolrRunner.this) {
int cnt = 0;
@ -220,9 +227,19 @@ public class JettySolrRunner {
public void stop() throws Exception {
if (!server.isStopped() && !server.isStopping()) {
server.stop();
RUNNING_JETTIES.remove(this);
}
server.join();
}
public static void assertStoppedJetties() {
if (RUNNING_JETTIES.size() > 0) {
Iterator<Exception> stacktraces = RUNNING_JETTIES.values().iterator();
Exception cause = null;
cause = stacktraces.next();
throw new RuntimeException("Found a bad one!", cause);
}
}
/**
* Returns the Local Port of the jetty Server.

View File

@ -479,9 +479,11 @@ public class Overseer {
isClosed = true;
if (updaterThread != null) {
updaterThread.close();
updaterThread.interrupt();
}
if (ccThread != null) {
ccThread.close();
ccThread.interrupt();
}
}

View File

@ -139,7 +139,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
if (isClosed()) retries = INTERRUPTED;
boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure force=true does not download files we already have
boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making force=true not download files we already have?
if (!success) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
@ -292,9 +292,6 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
while (!successfulRecovery && !isClosed() && !isInterrupted()) { // don't use interruption or it will close channels though
try {
// first thing we just try to sync
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
CloudDescriptor cloudDesc = core.getCoreDescriptor()
.getCloudDescriptor();
ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
@ -305,6 +302,19 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
boolean isLeader = leaderUrl.equals(ourUrl);
if (isLeader) {
// we are now the leader - no one else must have been suitable
log.warn("We have not yet recovered - but we are now the leader! core=" + coreName);
log.info("Finished recovery process. core=" + coreName);
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
return;
}
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName);
@ -358,7 +368,19 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
log.info("Begin buffering updates. core=" + coreName);
ulog.bufferUpdates();
replayed = false;
// // open a new IndexWriter - we don't want any background merges ongoing
// // also ensures something like NRTCachingDirectory is flushed
// boolean forceNewIndexDir = false;
// try {
// core.getUpdateHandler().newIndexWriter(false);
// } catch (Throwable t) {
// SolrException.log(log, "Could not read the current index - replicating to a new directory", t);
// // something is wrong with the index
// // we need to force using a new index directory
// forceNewIndexDir = true;
// }
//
try {
replicate(zkController.getNodeName(), core,
@ -377,7 +399,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
log.warn("Recovery was interrupted", e);
retries = INTERRUPTED;
} catch (Throwable t) {
log.error("Error while trying to recover", t);
SolrException.log(log, "Error while trying to recover", t);
} finally {
if (!replayed) {
try {
@ -390,7 +412,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
}
} catch (Throwable t) {
log.error("Error while trying to recover. core=" + coreName, t);
SolrException.log(log, "Error while trying to recover. core=" + coreName, t);
}
if (!successfulRecovery) {
@ -405,7 +427,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
if (retries == INTERRUPTED) {
} else {
log.error("Recovery failed - max retries exceeded. core=" + coreName);
SolrException.log(log, "Recovery failed - max retries exceeded. core=" + coreName);
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
}
@ -413,7 +435,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
}
} catch (Exception e) {
log.error("core=" + coreName, e);
SolrException.log(log, "core=" + coreName, e);
}
try {

View File

@ -105,7 +105,11 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
public void close() throws IOException {
synchronized (this) {
for (CacheValue val : byDirectoryCache.values()) {
val.directory.close();
try {
val.directory.close();
} catch (Throwable t) {
SolrException.log(log, "Error closing directory", t);
}
}
byDirectoryCache.clear();
byPathCache.clear();

View File

@ -283,7 +283,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private volatile SnapPuller tempSnapPuller;
public boolean doFetch(SolrParams solrParams, boolean force) {
public boolean doFetch(SolrParams solrParams, boolean forceReplication) {
String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
if (!snapPullLock.tryLock())
return false;
@ -294,7 +294,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
nl.remove(SnapPuller.POLL_INTERVAL);
tempSnapPuller = new SnapPuller(nl, this, core);
}
return tempSnapPuller.fetchLatestIndex(core, force);
return tempSnapPuller.fetchLatestIndex(core, forceReplication);
} catch (Exception e) {
SolrException.log(LOG, "SnapPull failed ", e);
} finally {

View File

@ -243,12 +243,11 @@ public class SnapPuller {
* downloaded. It also downloads the conf files (if they are modified).
*
* @param core the SolrCore
* @param force force a replication in all cases
* @param forceReplication force a replication in all cases
* @return true on success, false if slave is already in sync
* @throws IOException if an exception occurs
*/
@SuppressWarnings("unchecked")
boolean fetchLatestIndex(SolrCore core, boolean force) throws IOException, InterruptedException {
boolean fetchLatestIndex(SolrCore core, boolean forceReplication) throws IOException, InterruptedException {
successfulInstall = false;
replicationStartTime = System.currentTimeMillis();
try {
@ -278,7 +277,7 @@ public class SnapPuller {
}
if (latestVersion == 0L) {
if (force && commit.getGeneration() != 0) {
if (forceReplication && commit.getGeneration() != 0) {
// since we won't get the files for an empty index,
// we just clear ours and commit
RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core);
@ -297,7 +296,7 @@ public class SnapPuller {
return true;
}
if (!force && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
//master and slave are already in sync just return
LOG.info("Slave in sync with master.");
successfulInstall = true;
@ -318,10 +317,11 @@ public class SnapPuller {
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generateion of master is older than that of the slave , it means they are not compatible to be copied
// then a new index direcory to be created and all the files need to be copied
boolean isFullCopyNeeded = IndexDeletionPolicyWrapper.getCommitTimestamp(commit) >= latestVersion || force;
boolean isFullCopyNeeded = IndexDeletionPolicyWrapper.getCommitTimestamp(commit) >= latestVersion || forceReplication;
File tmpIndexDir = createTempindexDir(core);
if (isIndexStale())
if (isIndexStale()) {
isFullCopyNeeded = true;
}
LOG.info("Starting download to " + tmpIndexDir + " fullCopy=" + isFullCopyNeeded);
successfulInstall = false;
boolean deleteTmpIdxDir = true;

View File

@ -97,12 +97,14 @@ public final class DefaultSolrCoreState extends SolrCoreState {
@Override
public synchronized void newIndexWriter(SolrCore core, boolean rollback) throws IOException {
log.info("Creating new IndexWriter...");
String coreName = core.getName();
synchronized (writerPauseLock) {
// we need to wait for the Writer to fall out of use
// first lets stop it from being lent out
pauseWriter = true;
// then lets wait until its out of use
log.info("Waiting until IndexWriter is unused... core=" + coreName);
while (!writerFree) {
try {
writerPauseLock.wait();
@ -112,14 +114,15 @@ public final class DefaultSolrCoreState extends SolrCoreState {
try {
if (indexWriter != null) {
try {
log.info("Closing old IndexWriter... core=" + coreName);
indexWriter.close();
} catch (Throwable t) {
SolrException.log(log, "Error closing old IndexWriter", t);
SolrException.log(log, "Error closing old IndexWriter. core=" + coreName, t);
}
}
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2",
false, true);
log.info("New IndexWriter is ready to be used.");
// we need to null this so it picks up the new writer next get call
refCntWriter = null;
} finally {
@ -136,6 +139,7 @@ public final class DefaultSolrCoreState extends SolrCoreState {
refCnt--;
if (refCnt == 0) {
try {
log.info("SolrCoreState ref count has reached 0 - closing IndexWriter");
if (closer != null) {
closer.closeWriter(indexWriter);
} else if (indexWriter != null) {

View File

@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase {
public static Logger log = LoggerFactory.getLogger(ChaosMonkeyNothingIsSafeTest.class);
private static final int BASE_RUN_LENGTH = 180000;
private static final int BASE_RUN_LENGTH = 45000;
@BeforeClass
public static void beforeSuperClass() {
@ -71,8 +71,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
public ChaosMonkeyNothingIsSafeTest() {
super();
sliceCount = 2;
shardCount = 6;
sliceCount = 3;
shardCount = 12;
}
@Override
@ -105,6 +105,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
searchThread.start();
}
// TODO: only do this randomly - if we don't do it, compare against control below
FullThrottleStopableIndexingThread ftIndexThread = new FullThrottleStopableIndexingThread(
clients, i * 50000, true);
threads.add(ftIndexThread);
@ -137,7 +138,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
Thread.sleep(2000);
// wait until there are no recoveries...
waitForThingsToLevelOut(Math.round((runLength / 1000.0f / 5.0f)));
waitForThingsToLevelOut(Integer.MAX_VALUE);//Math.round((runLength / 1000.0f / 3.0f)));
// make sure we again have leaders for each shard
for (int j = 1; j < sliceCount; j++) {
@ -151,6 +152,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
zkStateReader.updateClusterState(true);
assertTrue(zkStateReader.getClusterState().getLiveNodes().size() > 0);
// we dont't current check vs control because the full throttle thread can
// have request fails
checkShardConsistency(false, true);
// ensure we have added more than 0 docs

View File

@ -69,8 +69,8 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
public ChaosMonkeySafeLeaderTest() {
super();
sliceCount = atLeast(2);
shardCount = atLeast(sliceCount*2);
sliceCount = 3;//atLeast(2);
shardCount = 12;//atLeast(sliceCount*2);
}
@Override
@ -114,7 +114,7 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
// try and wait for any replications and what not to finish...
waitForThingsToLevelOut(Math.round((runLength / 1000.0f / 5.0f)));
waitForThingsToLevelOut(Integer.MAX_VALUE);//Math.round((runLength / 1000.0f / 3.0f)));
checkShardConsistency(true, true);

View File

@ -203,6 +203,8 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
System.clearProperty("solr.test.sys.prop2");
resetExceptionIgnores();
super.tearDown();
JettySolrRunner.assertStoppedJetties();
}
protected void printLayout() throws Exception {

View File

@ -68,6 +68,8 @@ public class ChaosMonkey {
private boolean aggressivelyKillLeaders;
private Map<String,CloudJettyRunner> shardToLeaderJetty;
private long startTime;
private Thread monkeyThread;
public ChaosMonkey(ZkTestServer zkServer, ZkStateReader zkStateReader,
String collection, Map<String,List<CloudJettyRunner>> shardToJetty,
@ -355,7 +357,7 @@ public class ChaosMonkey {
// TODO: when kill leaders is on, lets kill a higher percentage of leaders
stop = false;
new Thread() {
monkeyThread = new Thread() {
private List<CloudJettyRunner> deadPool = new ArrayList<CloudJettyRunner>();
@Override
@ -413,7 +415,8 @@ public class ChaosMonkey {
+ ". I also expired " + expires.get() + " and caused " + connloss
+ " connection losses");
}
}.start();
};
monkeyThread.start();
}
public static void monkeyLog(String msg) {
@ -422,6 +425,11 @@ public class ChaosMonkey {
public void stopTheMonkey() {
stop = true;
try {
monkeyThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public int getStarts() {
@ -435,17 +443,18 @@ public class ChaosMonkey {
public static boolean start(JettySolrRunner jetty) throws Exception {
try {
jetty.start();
} catch (BindException e) {
} catch (Exception e) {
jetty.stop();
Thread.sleep(2000);
try {
jetty.start();
} catch (BindException e2) {
} catch (Exception e2) {
jetty.stop();
Thread.sleep(5000);
try {
jetty.start();
} catch (BindException e3) {
} catch (Exception e3) {
log.error("", e3);
// we coud not get the port
jetty.stop();
return false;

View File

@ -240,6 +240,7 @@ public class ZkTestServer {
}
cnt++;
}
log.info("start zk server on port:" + port);
}
@SuppressWarnings("deprecation")

View File

@ -11,6 +11,8 @@ java.util.logging.ConsoleHandler.formatter=org.apache.solr.SolrLogFormatter
#org.apache.solr.update.processor.DistributedUpdateProcessor=FINEST
#org.apache.solr.update.PeerSync.level=FINEST
#org.apache.solr.cloud.RecoveryStrategy.level=FINEST
#org.apache.solr.cloud.SyncStrategy.level=FINEST
#org.apache.solr.update.DefaultSolrCoreState.level=FINEST
#org.apache.solr.update.UpdateLog.level=FINE
#org.apache.solr.update.TransactionLog.level=FINEST