From c607f548bb1f17b091800ecd4821dfe8cb3a9771 Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Tue, 14 Aug 2012 17:16:43 +0000 Subject: [PATCH] solrcloud: improve some logging, improve some testing, other minor tweaks git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1372981 13f79535-47bb-0310-9956-ffa450edef68 --- .../solrj/embedded/JettySolrRunner.java | 19 ++++++++- .../java/org/apache/solr/cloud/Overseer.java | 2 + .../apache/solr/cloud/RecoveryStrategy.java | 40 ++++++++++++++----- .../solr/core/CachingDirectoryFactory.java | 6 ++- .../solr/handler/ReplicationHandler.java | 4 +- .../org/apache/solr/handler/SnapPuller.java | 14 +++---- .../solr/update/DefaultSolrCoreState.java | 10 +++-- .../cloud/ChaosMonkeyNothingIsSafeTest.java | 11 +++-- .../solr/cloud/ChaosMonkeySafeLeaderTest.java | 6 +-- .../solr/cloud/AbstractDistribZkTestBase.java | 2 + .../org/apache/solr/cloud/ChaosMonkey.java | 19 ++++++--- .../org/apache/solr/cloud/ZkTestServer.java | 1 + solr/testlogging.properties | 2 + 13 files changed, 101 insertions(+), 35 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java index 0729704c468..2f31cb58624 100644 --- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java +++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java @@ -19,6 +19,9 @@ package org.apache.solr.client.solrj.embedded; import java.io.IOException; import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import java.util.Random; import javax.servlet.DispatcherType; @@ -27,7 +30,8 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.solr.servlet.SolrDispatchFilter; -import org.eclipse.jetty.server.*; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.bio.SocketConnector; import org.eclipse.jetty.server.handler.GzipHandler; import org.eclipse.jetty.server.session.HashSessionIdManager; @@ -43,6 +47,8 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool; * @since solr 1.3 */ public class JettySolrRunner { + static Map RUNNING_JETTIES = new HashMap(); + Server server; FilterHolder dispatchFilter; @@ -202,6 +208,7 @@ public class JettySolrRunner { if (!server.isRunning()) { server.start(); + RUNNING_JETTIES.put(this, new RuntimeException()); } synchronized (JettySolrRunner.this) { int cnt = 0; @@ -220,9 +227,19 @@ public class JettySolrRunner { public void stop() throws Exception { if (!server.isStopped() && !server.isStopping()) { server.stop(); + RUNNING_JETTIES.remove(this); } server.join(); } + + public static void assertStoppedJetties() { + if (RUNNING_JETTIES.size() > 0) { + Iterator stacktraces = RUNNING_JETTIES.values().iterator(); + Exception cause = null; + cause = stacktraces.next(); + throw new RuntimeException("Found a bad one!", cause); + } + } /** * Returns the Local Port of the jetty Server. diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index f3c41e3251e..0163af9e2e7 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -479,9 +479,11 @@ public class Overseer { isClosed = true; if (updaterThread != null) { updaterThread.close(); + updaterThread.interrupt(); } if (ccThread != null) { ccThread.close(); + ccThread.interrupt(); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index d760381dedf..cb33b9f4d46 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -139,7 +139,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl); if (isClosed()) retries = INTERRUPTED; - boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure force=true does not download files we already have + boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making force=true not download files we already have? if (!success) { throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed."); @@ -292,9 +292,6 @@ public class RecoveryStrategy extends Thread implements ClosableThread { while (!successfulRecovery && !isClosed() && !isInterrupted()) { // don't use interruption or it will close channels though try { - // first thing we just try to sync - zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING); - CloudDescriptor cloudDesc = core.getCoreDescriptor() .getCloudDescriptor(); ZkNodeProps leaderprops = zkStateReader.getLeaderProps( @@ -305,6 +302,19 @@ public class RecoveryStrategy extends Thread implements ClosableThread { String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName); + String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName); + + boolean isLeader = leaderUrl.equals(ourUrl); + if (isLeader) { + // we are now the leader - no one else must have been suitable + log.warn("We have not yet recovered - but we are now the leader! core=" + coreName); + log.info("Finished recovery process. core=" + coreName); + zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE); + return; + } + + zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING); + sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName); @@ -358,7 +368,19 @@ public class RecoveryStrategy extends Thread implements ClosableThread { log.info("Begin buffering updates. core=" + coreName); ulog.bufferUpdates(); replayed = false; - + +// // open a new IndexWriter - we don't want any background merges ongoing +// // also ensures something like NRTCachingDirectory is flushed +// boolean forceNewIndexDir = false; +// try { +// core.getUpdateHandler().newIndexWriter(false); +// } catch (Throwable t) { +// SolrException.log(log, "Could not read the current index - replicating to a new directory", t); +// // something is wrong with the index +// // we need to force using a new index directory +// forceNewIndexDir = true; +// } +// try { replicate(zkController.getNodeName(), core, @@ -377,7 +399,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { log.warn("Recovery was interrupted", e); retries = INTERRUPTED; } catch (Throwable t) { - log.error("Error while trying to recover", t); + SolrException.log(log, "Error while trying to recover", t); } finally { if (!replayed) { try { @@ -390,7 +412,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { } } catch (Throwable t) { - log.error("Error while trying to recover. core=" + coreName, t); + SolrException.log(log, "Error while trying to recover. core=" + coreName, t); } if (!successfulRecovery) { @@ -405,7 +427,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { if (retries == INTERRUPTED) { } else { - log.error("Recovery failed - max retries exceeded. core=" + coreName); + SolrException.log(log, "Recovery failed - max retries exceeded. core=" + coreName); recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor()); } @@ -413,7 +435,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { } } catch (Exception e) { - log.error("core=" + coreName, e); + SolrException.log(log, "core=" + coreName, e); } try { diff --git a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java index 349926e71e0..e98fd5fcfb2 100644 --- a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java @@ -105,7 +105,11 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory { public void close() throws IOException { synchronized (this) { for (CacheValue val : byDirectoryCache.values()) { - val.directory.close(); + try { + val.directory.close(); + } catch (Throwable t) { + SolrException.log(log, "Error closing directory", t); + } } byDirectoryCache.clear(); byPathCache.clear(); diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java index 41e0317dbb8..7e0271b7a50 100644 --- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -283,7 +283,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw private volatile SnapPuller tempSnapPuller; - public boolean doFetch(SolrParams solrParams, boolean force) { + public boolean doFetch(SolrParams solrParams, boolean forceReplication) { String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL); if (!snapPullLock.tryLock()) return false; @@ -294,7 +294,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw nl.remove(SnapPuller.POLL_INTERVAL); tempSnapPuller = new SnapPuller(nl, this, core); } - return tempSnapPuller.fetchLatestIndex(core, force); + return tempSnapPuller.fetchLatestIndex(core, forceReplication); } catch (Exception e) { SolrException.log(LOG, "SnapPull failed ", e); } finally { diff --git a/solr/core/src/java/org/apache/solr/handler/SnapPuller.java b/solr/core/src/java/org/apache/solr/handler/SnapPuller.java index f6160969c5d..72d52263f0a 100644 --- a/solr/core/src/java/org/apache/solr/handler/SnapPuller.java +++ b/solr/core/src/java/org/apache/solr/handler/SnapPuller.java @@ -243,12 +243,11 @@ public class SnapPuller { * downloaded. It also downloads the conf files (if they are modified). * * @param core the SolrCore - * @param force force a replication in all cases + * @param forceReplication force a replication in all cases * @return true on success, false if slave is already in sync * @throws IOException if an exception occurs */ - @SuppressWarnings("unchecked") - boolean fetchLatestIndex(SolrCore core, boolean force) throws IOException, InterruptedException { + boolean fetchLatestIndex(SolrCore core, boolean forceReplication) throws IOException, InterruptedException { successfulInstall = false; replicationStartTime = System.currentTimeMillis(); try { @@ -278,7 +277,7 @@ public class SnapPuller { } if (latestVersion == 0L) { - if (force && commit.getGeneration() != 0) { + if (forceReplication && commit.getGeneration() != 0) { // since we won't get the files for an empty index, // we just clear ours and commit RefCounted iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core); @@ -297,7 +296,7 @@ public class SnapPuller { return true; } - if (!force && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) { + if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) { //master and slave are already in sync just return LOG.info("Slave in sync with master."); successfulInstall = true; @@ -318,10 +317,11 @@ public class SnapPuller { filesDownloaded = Collections.synchronizedList(new ArrayList>()); // if the generateion of master is older than that of the slave , it means they are not compatible to be copied // then a new index direcory to be created and all the files need to be copied - boolean isFullCopyNeeded = IndexDeletionPolicyWrapper.getCommitTimestamp(commit) >= latestVersion || force; + boolean isFullCopyNeeded = IndexDeletionPolicyWrapper.getCommitTimestamp(commit) >= latestVersion || forceReplication; File tmpIndexDir = createTempindexDir(core); - if (isIndexStale()) + if (isIndexStale()) { isFullCopyNeeded = true; + } LOG.info("Starting download to " + tmpIndexDir + " fullCopy=" + isFullCopyNeeded); successfulInstall = false; boolean deleteTmpIdxDir = true; diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java index 0aec42e3747..67b43448ed3 100644 --- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java +++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java @@ -97,12 +97,14 @@ public final class DefaultSolrCoreState extends SolrCoreState { @Override public synchronized void newIndexWriter(SolrCore core, boolean rollback) throws IOException { - + log.info("Creating new IndexWriter..."); + String coreName = core.getName(); synchronized (writerPauseLock) { // we need to wait for the Writer to fall out of use // first lets stop it from being lent out pauseWriter = true; // then lets wait until its out of use + log.info("Waiting until IndexWriter is unused... core=" + coreName); while (!writerFree) { try { writerPauseLock.wait(); @@ -112,14 +114,15 @@ public final class DefaultSolrCoreState extends SolrCoreState { try { if (indexWriter != null) { try { + log.info("Closing old IndexWriter... core=" + coreName); indexWriter.close(); } catch (Throwable t) { - SolrException.log(log, "Error closing old IndexWriter", t); + SolrException.log(log, "Error closing old IndexWriter. core=" + coreName, t); } } - indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2", false, true); + log.info("New IndexWriter is ready to be used."); // we need to null this so it picks up the new writer next get call refCntWriter = null; } finally { @@ -136,6 +139,7 @@ public final class DefaultSolrCoreState extends SolrCoreState { refCnt--; if (refCnt == 0) { try { + log.info("SolrCoreState ref count has reached 0 - closing IndexWriter"); if (closer != null) { closer.closeWriter(indexWriter); } else if (indexWriter != null) { diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java index 7e4f7e3498d..c53198d0be8 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java @@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory; public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase { public static Logger log = LoggerFactory.getLogger(ChaosMonkeyNothingIsSafeTest.class); - private static final int BASE_RUN_LENGTH = 180000; + private static final int BASE_RUN_LENGTH = 45000; @BeforeClass public static void beforeSuperClass() { @@ -71,8 +71,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase public ChaosMonkeyNothingIsSafeTest() { super(); - sliceCount = 2; - shardCount = 6; + sliceCount = 3; + shardCount = 12; } @Override @@ -105,6 +105,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase searchThread.start(); } + // TODO: only do this randomly - if we don't do it, compare against control below FullThrottleStopableIndexingThread ftIndexThread = new FullThrottleStopableIndexingThread( clients, i * 50000, true); threads.add(ftIndexThread); @@ -137,7 +138,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase Thread.sleep(2000); // wait until there are no recoveries... - waitForThingsToLevelOut(Math.round((runLength / 1000.0f / 5.0f))); + waitForThingsToLevelOut(Integer.MAX_VALUE);//Math.round((runLength / 1000.0f / 3.0f))); // make sure we again have leaders for each shard for (int j = 1; j < sliceCount; j++) { @@ -151,6 +152,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase zkStateReader.updateClusterState(true); assertTrue(zkStateReader.getClusterState().getLiveNodes().size() > 0); + // we dont't current check vs control because the full throttle thread can + // have request fails checkShardConsistency(false, true); // ensure we have added more than 0 docs diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java index 1ddec4e9336..0dec1724fae 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java @@ -69,8 +69,8 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase { public ChaosMonkeySafeLeaderTest() { super(); - sliceCount = atLeast(2); - shardCount = atLeast(sliceCount*2); + sliceCount = 3;//atLeast(2); + shardCount = 12;//atLeast(sliceCount*2); } @Override @@ -114,7 +114,7 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase { // try and wait for any replications and what not to finish... - waitForThingsToLevelOut(Math.round((runLength / 1000.0f / 5.0f))); + waitForThingsToLevelOut(Integer.MAX_VALUE);//Math.round((runLength / 1000.0f / 3.0f))); checkShardConsistency(true, true); diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java index 6c1a683a2b6..bc85b099a75 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java @@ -203,6 +203,8 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes System.clearProperty("solr.test.sys.prop2"); resetExceptionIgnores(); super.tearDown(); + + JettySolrRunner.assertStoppedJetties(); } protected void printLayout() throws Exception { diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java index 3dc82926192..3bd76327f3d 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java @@ -68,6 +68,8 @@ public class ChaosMonkey { private boolean aggressivelyKillLeaders; private Map shardToLeaderJetty; private long startTime; + + private Thread monkeyThread; public ChaosMonkey(ZkTestServer zkServer, ZkStateReader zkStateReader, String collection, Map> shardToJetty, @@ -355,7 +357,7 @@ public class ChaosMonkey { // TODO: when kill leaders is on, lets kill a higher percentage of leaders stop = false; - new Thread() { + monkeyThread = new Thread() { private List deadPool = new ArrayList(); @Override @@ -413,7 +415,8 @@ public class ChaosMonkey { + ". I also expired " + expires.get() + " and caused " + connloss + " connection losses"); } - }.start(); + }; + monkeyThread.start(); } public static void monkeyLog(String msg) { @@ -422,6 +425,11 @@ public class ChaosMonkey { public void stopTheMonkey() { stop = true; + try { + monkeyThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } public int getStarts() { @@ -435,17 +443,18 @@ public class ChaosMonkey { public static boolean start(JettySolrRunner jetty) throws Exception { try { jetty.start(); - } catch (BindException e) { + } catch (Exception e) { jetty.stop(); Thread.sleep(2000); try { jetty.start(); - } catch (BindException e2) { + } catch (Exception e2) { jetty.stop(); Thread.sleep(5000); try { jetty.start(); - } catch (BindException e3) { + } catch (Exception e3) { + log.error("", e3); // we coud not get the port jetty.stop(); return false; diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java index 91b8c52799f..048e6c20f0a 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java @@ -240,6 +240,7 @@ public class ZkTestServer { } cnt++; } + log.info("start zk server on port:" + port); } @SuppressWarnings("deprecation") diff --git a/solr/testlogging.properties b/solr/testlogging.properties index 53af3746a0f..1a3e3ad4948 100644 --- a/solr/testlogging.properties +++ b/solr/testlogging.properties @@ -11,6 +11,8 @@ java.util.logging.ConsoleHandler.formatter=org.apache.solr.SolrLogFormatter #org.apache.solr.update.processor.DistributedUpdateProcessor=FINEST #org.apache.solr.update.PeerSync.level=FINEST #org.apache.solr.cloud.RecoveryStrategy.level=FINEST +#org.apache.solr.cloud.SyncStrategy.level=FINEST +#org.apache.solr.update.DefaultSolrCoreState.level=FINEST #org.apache.solr.update.UpdateLog.level=FINE #org.apache.solr.update.TransactionLog.level=FINEST