mirror of https://github.com/apache/lucene.git
SOLR-3122: allow interrupting zkcmdoperations and some test work
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1244231 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3ba23367ea
commit
56f12201b7
|
@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
|
|||
import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.SafeStopThread;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
|
@ -46,7 +47,7 @@ import org.apache.solr.update.UpdateLog.RecoveryInfo;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class RecoveryStrategy extends Thread {
|
||||
public class RecoveryStrategy extends Thread implements SafeStopThread {
|
||||
private static final int MAX_RETRIES = 500;
|
||||
private static final int INTERRUPTED = MAX_RETRIES + 1;
|
||||
private static final int START_TIMEOUT = 100;
|
||||
|
@ -191,8 +192,14 @@ public class RecoveryStrategy extends Thread {
|
|||
}
|
||||
log.info("Sync Recovery was not successful - trying replication");
|
||||
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
|
||||
if (ulog == null) return;
|
||||
if (ulog == null) {
|
||||
SolrException.log(log, "No UpdateLog found - cannot recover");
|
||||
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
|
||||
core.getCoreDescriptor());
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Begin buffering updates");
|
||||
ulog.bufferUpdates();
|
||||
replayed = false;
|
||||
|
||||
|
@ -296,4 +303,8 @@ public class RecoveryStrategy extends Thread {
|
|||
return future;
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
return close;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,15 +18,23 @@ package org.apache.solr.cloud;
|
|||
*/
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.BaseDistributedSearchTestCase;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.common.cloud.CloudState;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.core.SolrConfig;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
||||
public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearchTestCase {
|
||||
|
||||
protected static final String DEFAULT_COLLECTION = "collection1";
|
||||
private static final boolean DEBUG = false;
|
||||
protected ZkTestServer zkServer;
|
||||
|
||||
|
@ -44,6 +52,10 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
|
|||
zkServer.run();
|
||||
|
||||
System.setProperty("zkHost", zkServer.getZkAddress());
|
||||
System.setProperty("enable.update.log", "true");
|
||||
System.setProperty("remove.version.field", "true");
|
||||
System
|
||||
.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||
|
||||
AbstractZkTestCase.buildZooKeeper(zkServer.getZkHost(), zkServer.getZkAddress(), "solrconfig.xml", "schema.xml");
|
||||
|
||||
|
@ -71,7 +83,82 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
|
|||
shards = sb.toString();
|
||||
}
|
||||
|
||||
protected void waitForRecoveriesToFinish(String collection, ZkStateReader zkStateReader, boolean verbose)
|
||||
throws KeeperException, InterruptedException {
|
||||
waitForRecoveriesToFinish(collection, zkStateReader, verbose, false);
|
||||
}
|
||||
|
||||
protected void waitForRecoveriesToFinish(String collection,
|
||||
ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout)
|
||||
throws KeeperException, InterruptedException {
|
||||
boolean cont = true;
|
||||
int cnt = 0;
|
||||
|
||||
while (cont) {
|
||||
if (verbose) System.out.println("-");
|
||||
boolean sawLiveRecovering = false;
|
||||
zkStateReader.updateCloudState(true);
|
||||
CloudState cloudState = zkStateReader.getCloudState();
|
||||
Map<String,Slice> slices = cloudState.getSlices(collection);
|
||||
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
|
||||
Map<String,ZkNodeProps> shards = entry.getValue().getShards();
|
||||
for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
|
||||
if (verbose) System.out.println("rstate:"
|
||||
+ shard.getValue().get(ZkStateReader.STATE_PROP)
|
||||
+ " live:"
|
||||
+ cloudState.liveNodesContain(shard.getValue().get(
|
||||
ZkStateReader.NODE_NAME_PROP)));
|
||||
String state = shard.getValue().get(ZkStateReader.STATE_PROP);
|
||||
if ((state.equals(ZkStateReader.RECOVERING) || state
|
||||
.equals(ZkStateReader.SYNC))
|
||||
&& cloudState.liveNodesContain(shard.getValue().get(
|
||||
ZkStateReader.NODE_NAME_PROP))) {
|
||||
sawLiveRecovering = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!sawLiveRecovering || cnt == 15) {
|
||||
if (!sawLiveRecovering) {
|
||||
if (verbose) System.out.println("no one is recoverying");
|
||||
} else {
|
||||
if (failOnTimeout) {
|
||||
fail("There are still nodes recoverying");
|
||||
return;
|
||||
}
|
||||
if (verbose) System.out
|
||||
.println("gave up waiting for recovery to finish..");
|
||||
}
|
||||
cont = false;
|
||||
} else {
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
cnt++;
|
||||
}
|
||||
}
|
||||
|
||||
protected void assertAllActive(String collection,ZkStateReader zkStateReader)
|
||||
throws KeeperException, InterruptedException {
|
||||
|
||||
zkStateReader.updateCloudState(true);
|
||||
CloudState cloudState = zkStateReader.getCloudState();
|
||||
Map<String,Slice> slices = cloudState.getSlices(collection);
|
||||
if (slices == null) {
|
||||
throw new IllegalArgumentException("Cannot find collection:" + collection);
|
||||
}
|
||||
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
|
||||
Map<String,ZkNodeProps> shards = entry.getValue().getShards();
|
||||
for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
|
||||
|
||||
String state = shard.getValue().get(ZkStateReader.STATE_PROP);
|
||||
if (!state.equals(ZkStateReader.ACTIVE)) {
|
||||
fail("Not all shards are ACTIVE");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (DEBUG) {
|
||||
printLayout();
|
||||
|
@ -79,6 +166,9 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
|
|||
zkServer.shutdown();
|
||||
System.clearProperty("zkHost");
|
||||
System.clearProperty("collection");
|
||||
System.clearProperty("enable.update.log");
|
||||
System.clearProperty("remove.version.field");
|
||||
System.clearProperty("solr.directoryFactory");
|
||||
System.clearProperty("solr.test.sys.prop1");
|
||||
System.clearProperty("solr.test.sys.prop2");
|
||||
resetExceptionIgnores();
|
||||
|
@ -93,7 +183,5 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
|
|||
|
||||
@AfterClass
|
||||
public static void afterClass() throws InterruptedException {
|
||||
// wait just a bit for any zk client threads to outlast timeout
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,7 +62,8 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
|
|||
|
||||
private Map<String,List<SolrServer>> otherCollectionClients = new HashMap<String,List<SolrServer>>();
|
||||
private Map<String,List<SolrServer>> oneInstanceCollectionClients = new HashMap<String,List<SolrServer>>();
|
||||
private String oneInstanceCollection = "oneInstanceCollection";;
|
||||
private String oneInstanceCollection = "oneInstanceCollection";
|
||||
private String oneInstanceCollection2 = "oneInstanceCollection2";
|
||||
|
||||
public BasicDistributedZkTest() {
|
||||
fixShardCount = true;
|
||||
|
@ -247,12 +248,63 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
|
|||
testMultipleCollections();
|
||||
testANewCollectionInOneInstance();
|
||||
testSearchByCollectionName();
|
||||
testANewCollectionInOneInstanceWithManualShardAssignement();
|
||||
// Thread.sleep(10000000000L);
|
||||
if (DEBUG) {
|
||||
super.printLayout();
|
||||
}
|
||||
}
|
||||
|
||||
private void testANewCollectionInOneInstanceWithManualShardAssignement() throws Exception {
|
||||
List<SolrServer> collectionClients = new ArrayList<SolrServer>();
|
||||
SolrServer client = clients.get(0);
|
||||
oneInstanceCollectionClients.put(oneInstanceCollection , collectionClients);
|
||||
String baseUrl = ((CommonsHttpSolrServer) client).getBaseURL();
|
||||
createCollection(oneInstanceCollection2, collectionClients, baseUrl, 1, "slice1");
|
||||
createCollection(oneInstanceCollection2, collectionClients, baseUrl, 2, "slice2");
|
||||
createCollection(oneInstanceCollection2, collectionClients, baseUrl, 3, "slice2");
|
||||
createCollection(oneInstanceCollection2, collectionClients, baseUrl, 4, "slice1");
|
||||
|
||||
SolrServer client1 = createNewSolrServer(oneInstanceCollection2 + "1", baseUrl);
|
||||
SolrServer client2 = createNewSolrServer(oneInstanceCollection2 + "2", baseUrl);
|
||||
SolrServer client3 = createNewSolrServer(oneInstanceCollection2 + "3", baseUrl);
|
||||
SolrServer client4 = createNewSolrServer(oneInstanceCollection2 + "4", baseUrl);
|
||||
|
||||
client2.add(getDoc(id, "1"));
|
||||
client3.add(getDoc(id, "2"));
|
||||
client4.add(getDoc(id, "3"));
|
||||
|
||||
// no one should be recovering
|
||||
waitForRecoveriesToFinish(oneInstanceCollection2, solrj.getZkStateReader(), false, true);
|
||||
|
||||
assertAllActive(oneInstanceCollection2, solrj.getZkStateReader());
|
||||
|
||||
client1.commit();
|
||||
SolrQuery query = new SolrQuery("*:*");
|
||||
query.set("distrib", false);
|
||||
long oneDocs = client1.query(query).getResults().getNumFound();
|
||||
long twoDocs = client2.query(query).getResults().getNumFound();
|
||||
long threeDocs = client3.query(query).getResults().getNumFound();
|
||||
long fourDocs = client4.query(query).getResults().getNumFound();
|
||||
|
||||
query.set("collection", oneInstanceCollection2);
|
||||
query.set("distrib", true);
|
||||
long allDocs = solrj.query(query).getResults().getNumFound();
|
||||
|
||||
// System.out.println("1:" + oneDocs);
|
||||
// System.out.println("2:" + twoDocs);
|
||||
// System.out.println("3:" + threeDocs);
|
||||
// System.out.println("4:" + fourDocs);
|
||||
// System.out.println("All Docs:" + allDocs);
|
||||
|
||||
assertEquals(oneDocs, threeDocs);
|
||||
assertEquals(twoDocs, fourDocs);
|
||||
assertNotSame(oneDocs, twoDocs);
|
||||
assertEquals(3, allDocs);
|
||||
|
||||
|
||||
}
|
||||
|
||||
private void testSearchByCollectionName() throws SolrServerException {
|
||||
SolrServer client = clients.get(0);
|
||||
String baseUrl = ((CommonsHttpSolrServer) client).getBaseURL();
|
||||
|
@ -280,6 +332,9 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
|
|||
SolrServer client3 = createNewSolrServer(oneInstanceCollection + "3", baseUrl);
|
||||
SolrServer client4 = createNewSolrServer(oneInstanceCollection + "4", baseUrl);
|
||||
|
||||
waitForRecoveriesToFinish(oneInstanceCollection, solrj.getZkStateReader(), false);
|
||||
assertAllActive(oneInstanceCollection, solrj.getZkStateReader());
|
||||
|
||||
client2.add(getDoc(id, "1"));
|
||||
client3.add(getDoc(id, "2"));
|
||||
client4.add(getDoc(id, "3"));
|
||||
|
@ -311,6 +366,12 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
|
|||
private void createCollection(String collection,
|
||||
List<SolrServer> collectionClients, String baseUrl, int num)
|
||||
throws MalformedURLException, SolrServerException, IOException {
|
||||
createCollection(collection, collectionClients, baseUrl, num, null);
|
||||
}
|
||||
|
||||
private void createCollection(String collection,
|
||||
List<SolrServer> collectionClients, String baseUrl, int num, String shardId)
|
||||
throws MalformedURLException, SolrServerException, IOException {
|
||||
CommonsHttpSolrServer server = new CommonsHttpSolrServer(
|
||||
baseUrl);
|
||||
Create createCmd = new Create();
|
||||
|
@ -319,6 +380,7 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
|
|||
createCmd.setNumShards(2);
|
||||
createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator
|
||||
+ collection + num);
|
||||
createCmd.setShardId(shardId);
|
||||
server.request(createCmd);
|
||||
collectionClients.add(createNewSolrServer(collection, baseUrl));
|
||||
}
|
||||
|
@ -389,12 +451,13 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
|
|||
throws MalformedURLException, SolrServerException, IOException {
|
||||
List<SolrServer> collectionClients = new ArrayList<SolrServer>();
|
||||
otherCollectionClients.put(collection, collectionClients);
|
||||
int unique = 0;
|
||||
for (SolrServer client : clients) {
|
||||
CommonsHttpSolrServer server = new CommonsHttpSolrServer(
|
||||
((CommonsHttpSolrServer) client).getBaseURL());
|
||||
Create createCmd = new Create();
|
||||
createCmd.setCoreName(collection);
|
||||
createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator + collection);
|
||||
createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator + collection + unique++);
|
||||
server.request(createCmd);
|
||||
collectionClients.add(createNewSolrServer(collection,
|
||||
((CommonsHttpSolrServer) client).getBaseURL()));
|
||||
|
|
|
@ -65,8 +65,6 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
|
|||
|
||||
private static final String SHARD2 = "shard2";
|
||||
|
||||
protected static final String DEFAULT_COLLECTION = "collection1";
|
||||
|
||||
private boolean printLayoutOnTearDown = false;
|
||||
|
||||
String t1 = "a_t";
|
||||
|
@ -151,16 +149,12 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
|
|||
System
|
||||
.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||
System.setProperty("solrcloud.update.delay", "0");
|
||||
System.setProperty("enable.update.log", "true");
|
||||
System.setProperty("remove.version.field", "true");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() {
|
||||
System.clearProperty("solr.directoryFactory");
|
||||
System.clearProperty("solrcloud.update.delay");
|
||||
System.clearProperty("enable.update.log");
|
||||
System.clearProperty("remove.version.field");
|
||||
}
|
||||
|
||||
public FullSolrCloudTest() {
|
||||
|
@ -655,45 +649,7 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
|
|||
|
||||
protected void waitForRecoveriesToFinish(boolean verbose)
|
||||
throws KeeperException, InterruptedException {
|
||||
boolean cont = true;
|
||||
int cnt = 0;
|
||||
|
||||
while (cont) {
|
||||
if (verbose) System.out.println("-");
|
||||
boolean sawLiveRecovering = false;
|
||||
zkStateReader.updateCloudState(true);
|
||||
CloudState cloudState = zkStateReader.getCloudState();
|
||||
Map<String,Slice> slices = cloudState.getSlices(DEFAULT_COLLECTION);
|
||||
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
|
||||
Map<String,ZkNodeProps> shards = entry.getValue().getShards();
|
||||
for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
|
||||
if (verbose) System.out.println("rstate:"
|
||||
+ shard.getValue().get(ZkStateReader.STATE_PROP)
|
||||
+ " live:"
|
||||
+ cloudState.liveNodesContain(shard.getValue().get(
|
||||
ZkStateReader.NODE_NAME_PROP)));
|
||||
String state = shard.getValue().get(ZkStateReader.STATE_PROP);
|
||||
if ((state.equals(ZkStateReader.RECOVERING)
|
||||
|| state.equals(ZkStateReader.SYNC))
|
||||
&& cloudState.liveNodesContain(shard.getValue().get(
|
||||
ZkStateReader.NODE_NAME_PROP))) {
|
||||
sawLiveRecovering = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!sawLiveRecovering || cnt == 10) {
|
||||
if (!sawLiveRecovering) {
|
||||
if (verbose) System.out.println("no one is recoverying");
|
||||
} else {
|
||||
if (verbose) System.out
|
||||
.println("gave up waiting for recovery to finish..");
|
||||
}
|
||||
cont = false;
|
||||
} else {
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
cnt++;
|
||||
}
|
||||
super.waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, verbose);
|
||||
}
|
||||
|
||||
private void brindDownShardIndexSomeDocsAndRecover() throws Exception,
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
package org.apache.solr.common.cloud;
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
public interface SafeStopThread {
|
||||
public void stop();
|
||||
public boolean isClosed();
|
||||
}
|
|
@ -71,6 +71,11 @@ public class ZkCmdExecutor {
|
|||
Thread.currentThread().interrupt();
|
||||
throw new InterruptedException();
|
||||
}
|
||||
if (Thread.currentThread() instanceof SafeStopThread) {
|
||||
if (((SafeStopThread) Thread.currentThread()).isClosed()) {
|
||||
throw new RuntimeException("Interrupted");
|
||||
}
|
||||
}
|
||||
retryDelay(i);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue