SOLR-3995: Recovery may never finish on SolrCore shutdown if the last reference to a SolrCore is closed by the recovery process

SOLR-3994:Create more extensive tests around unloading cores.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1402393 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-10-26 05:18:51 +00:00
parent 78b1e07dc9
commit 80b39a3ba7
10 changed files with 99 additions and 29 deletions

View File

@ -113,6 +113,8 @@ Bug Fixes
* SOLR-3992: QuerySenderListener doesn't populate document cache.
(Shotaro Kamio, yonik)
* SOLR-3995: Recovery may never finish on SolrCore shutdown if the last reference to
a SolrCore is closed by the recovery process. (Mark Miller)
Other Changes
----------------------

View File

@ -17,6 +17,7 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.update.UpdateLog;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory;
*/
public abstract class ElectionContext {
private static Logger log = LoggerFactory.getLogger(ElectionContext.class);
final String electionPath;
final ZkNodeProps leaderProps;
final String id;
@ -58,7 +59,12 @@ public abstract class ElectionContext {
public void close() {}
public void cancelElection() throws InterruptedException, KeeperException {
zkClient.delete(leaderSeqPath, -1, true);
try {
zkClient.delete(leaderSeqPath, -1, true);
} catch (NoNodeException e) {
// fine
log.warn("cancelElection did not find election node to remove");
}
}
abstract void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException, IOException;

View File

@ -84,6 +84,10 @@ public class LeaderElector {
sortSeqs(seqs);
List<Integer> intSeqs = getSeqs(seqs);
if (intSeqs.size() == 0) {
log.warn("Our node is no longer in line to be leader");
return;
}
if (seq <= intSeqs.get(0)) {
// first we delete the node advertising the old leader in case the ephem is still there
try {

View File

@ -313,7 +313,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
}
}
while (!successfulRecovery && !isInterrupted()) { // don't use interruption or it will close channels though
while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
try {
CloudDescriptor cloudDesc = core.getCoreDescriptor()
.getCloudDescriptor();

View File

@ -1072,7 +1072,9 @@ public class CoreContainer
synchronized(cores) {
SolrCore core = cores.remove( name );
coreToOrigName.remove(core);
if (core != null) {
coreToOrigName.remove(core);
}
return core;
}

View File

@ -668,7 +668,12 @@ public class CoreAdminHandler extends RequestHandlerBase {
});
}
} finally {
if (core != null) core.close();
if (core != null) {
if (coreContainer.getZkController() != null) {
core.getSolrCoreState().cancelRecovery();
}
core.close();
}
}
return coreContainer.isPersistent();

View File

@ -44,7 +44,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
private volatile boolean recoveryRunning;
private RecoveryStrategy recoveryStrat;
private boolean closed = false;
private volatile boolean closed = false;
private RefCounted<IndexWriter> refCntWriter;

View File

@ -67,7 +67,6 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
@ -322,19 +321,19 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
// would be better if these where all separate tests - but much, much
// slower
// doOptimisticLockingAndUpdating();
// testMultipleCollections();
// testANewCollectionInOneInstance();
// testSearchByCollectionName();
// testANewCollectionInOneInstanceWithManualShardAssignement();
// testNumberOfCommitsWithCommitAfterAdd();
//
// testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-explicit");
// testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-implicit");
//
// testCollectionsAPI();
doOptimisticLockingAndUpdating();
testMultipleCollections();
testANewCollectionInOneInstance();
testSearchByCollectionName();
testANewCollectionInOneInstanceWithManualShardAssignement();
testNumberOfCommitsWithCommitAfterAdd();
testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-explicit");
testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-implicit");
testCollectionsAPI();
testCoreUnloadAndLeaders();
testUnloadLotsOfCores();
// Thread.sleep(10000000000L);
if (DEBUG) {
super.printLayout();
@ -412,8 +411,6 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
createCmd.setDataDir(core3dataDir);
server.request(createCmd);
Thread.sleep(1000);
waitForRecoveriesToFinish("unloadcollection", zkStateReader, false);
// so that we start with some versions when we reload...
@ -474,8 +471,6 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
createCmd.setDataDir(core4dataDir);
server.request(createCmd);
Thread.sleep(1000);
waitForRecoveriesToFinish("unloadcollection", zkStateReader, false);
// unload the leader again
@ -509,9 +504,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
createCmd.setCollection("unloadcollection");
createCmd.setDataDir(core1DataDir);
server.request(createCmd);
Thread.sleep(1000);
waitForRecoveriesToFinish("unloadcollection", zkStateReader, false);
@ -538,7 +531,65 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
assertEquals(found3, found4);
}
private void testUnloadLotsOfCores() throws Exception {
SolrServer client = clients.get(2);
String url3 = getBaseUrl(client);
final HttpSolrServer server = new HttpSolrServer(url3);
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
int cnt = atLeast(6);
for (int i = 0; i < cnt; i++) {
final int freezeI = i;
executor.execute(new Runnable() {
@Override
public void run() {
Create createCmd = new Create();
createCmd.setCoreName("multiunload" + freezeI);
createCmd.setCollection("multiunload");
String core3dataDir = dataDir.getAbsolutePath() + File.separator
+ System.currentTimeMillis() + "unloadcollection" + "_3n" + freezeI;
createCmd.setDataDir(core3dataDir);
try {
server.request(createCmd);
} catch (SolrServerException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
executor.shutdown();
executor.awaitTermination(120, TimeUnit.SECONDS);
executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
for (int j = 0; j < cnt; j++) {
final int freezeJ = j;
executor.execute(new Runnable() {
@Override
public void run() {
Unload unloadCmd = new Unload(true);
unloadCmd.setCoreName("multiunload" + freezeJ);
try {
server.request(unloadCmd);
} catch (SolrServerException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
Thread.sleep(random().nextInt(50));
}
executor.shutdown();
executor.awaitTermination(120, TimeUnit.SECONDS);
}
private String getBaseUrl(SolrServer client) {
String url2 = ((HttpSolrServer) client).getBaseURL()
@ -794,7 +845,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
}
Thread.sleep(50);
}
printLayout();
fail("Could not find the new collection - " + exp.code() + " : " + collectionClient.getBaseURL());
}

View File

@ -211,7 +211,6 @@ public class CloudSolrServer extends SolrServer {
if ((sendToLeaders && leaderUrlList == null) || (!sendToLeaders
&& urlList == null)
|| clusterState.hashCode() != this.lastClusterStateHashCode) {
System.out.println("build a new map for " + collection);
// build a map of unique nodes
// TODO: allow filtering by group, role, etc
Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();

View File

@ -117,6 +117,7 @@ public class ClusterState implements JSONWriter.Writable {
*/
public Replica getShardProps(final String collection, final String coreNodeName) {
Map<String, Slice> slices = getSlices(collection);
if (slices == null) return null;
for(Slice slice: slices.values()) {
if(slice.getReplicasMap().get(coreNodeName)!=null) {
return slice.getReplicasMap().get(coreNodeName);