diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 50ddfafef11..4dbef8a2c9b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.cloud.SafeStopThread; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; @@ -46,7 +47,7 @@ import org.apache.solr.update.UpdateLog.RecoveryInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RecoveryStrategy extends Thread { +public class RecoveryStrategy extends Thread implements SafeStopThread { private static final int MAX_RETRIES = 500; private static final int INTERRUPTED = MAX_RETRIES + 1; private static final int START_TIMEOUT = 100; @@ -191,8 +192,14 @@ public class RecoveryStrategy extends Thread { } log.info("Sync Recovery was not successful - trying replication"); UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); - if (ulog == null) return; + if (ulog == null) { + SolrException.log(log, "No UpdateLog found - cannot recover"); + recoveryFailed(core, zkController, baseUrl, coreZkNodeName, + core.getCoreDescriptor()); + return; + } + log.info("Begin buffering updates"); ulog.bufferUpdates(); replayed = false; @@ -296,4 +303,8 @@ public class RecoveryStrategy extends Thread { return future; } + public boolean isClosed() { + return close; + } + } diff --git a/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java b/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java index 2f9b7f3b5ed..850d3510635 100644 --- a/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java +++ b/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java @@ -18,15 +18,23 @@ package org.apache.solr.cloud; */ import java.io.File; +import java.util.Map; import org.apache.solr.BaseDistributedSearchTestCase; import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.common.cloud.CloudState; +import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; -import org.apache.solr.core.SolrConfig; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.zookeeper.KeeperException; +import org.junit.After; import org.junit.AfterClass; import org.junit.Before; public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearchTestCase { + + protected static final String DEFAULT_COLLECTION = "collection1"; private static final boolean DEBUG = false; protected ZkTestServer zkServer; @@ -44,6 +52,10 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc zkServer.run(); System.setProperty("zkHost", zkServer.getZkAddress()); + System.setProperty("enable.update.log", "true"); + System.setProperty("remove.version.field", "true"); + System + .setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory"); AbstractZkTestCase.buildZooKeeper(zkServer.getZkHost(), zkServer.getZkAddress(), "solrconfig.xml", "schema.xml"); @@ -70,8 +82,83 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc shards = sb.toString(); } + + protected void waitForRecoveriesToFinish(String collection, ZkStateReader zkStateReader, boolean verbose) + throws KeeperException, InterruptedException { + waitForRecoveriesToFinish(collection, zkStateReader, verbose, false); + } + + protected void waitForRecoveriesToFinish(String collection, + ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout) + throws KeeperException, InterruptedException { + boolean cont = true; + int cnt = 0; + + while (cont) { + if (verbose) System.out.println("-"); + boolean sawLiveRecovering = false; + zkStateReader.updateCloudState(true); + CloudState cloudState = zkStateReader.getCloudState(); + Map slices = cloudState.getSlices(collection); + for (Map.Entry entry : slices.entrySet()) { + Map shards = entry.getValue().getShards(); + for (Map.Entry shard : shards.entrySet()) { + if (verbose) System.out.println("rstate:" + + shard.getValue().get(ZkStateReader.STATE_PROP) + + " live:" + + cloudState.liveNodesContain(shard.getValue().get( + ZkStateReader.NODE_NAME_PROP))); + String state = shard.getValue().get(ZkStateReader.STATE_PROP); + if ((state.equals(ZkStateReader.RECOVERING) || state + .equals(ZkStateReader.SYNC)) + && cloudState.liveNodesContain(shard.getValue().get( + ZkStateReader.NODE_NAME_PROP))) { + sawLiveRecovering = true; + } + } + } + if (!sawLiveRecovering || cnt == 15) { + if (!sawLiveRecovering) { + if (verbose) System.out.println("no one is recoverying"); + } else { + if (failOnTimeout) { + fail("There are still nodes recoverying"); + return; + } + if (verbose) System.out + .println("gave up waiting for recovery to finish.."); + } + cont = false; + } else { + Thread.sleep(2000); + } + cnt++; + } + } + protected void assertAllActive(String collection,ZkStateReader zkStateReader) + throws KeeperException, InterruptedException { + + zkStateReader.updateCloudState(true); + CloudState cloudState = zkStateReader.getCloudState(); + Map slices = cloudState.getSlices(collection); + if (slices == null) { + throw new IllegalArgumentException("Cannot find collection:" + collection); + } + for (Map.Entry entry : slices.entrySet()) { + Map shards = entry.getValue().getShards(); + for (Map.Entry shard : shards.entrySet()) { + + String state = shard.getValue().get(ZkStateReader.STATE_PROP); + if (!state.equals(ZkStateReader.ACTIVE)) { + fail("Not all shards are ACTIVE"); + } + } + } + } + @Override + @After public void tearDown() throws Exception { if (DEBUG) { printLayout(); @@ -79,6 +166,9 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc zkServer.shutdown(); System.clearProperty("zkHost"); System.clearProperty("collection"); + System.clearProperty("enable.update.log"); + System.clearProperty("remove.version.field"); + System.clearProperty("solr.directoryFactory"); System.clearProperty("solr.test.sys.prop1"); System.clearProperty("solr.test.sys.prop2"); resetExceptionIgnores(); @@ -93,7 +183,5 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc @AfterClass public static void afterClass() throws InterruptedException { - // wait just a bit for any zk client threads to outlast timeout - Thread.sleep(2000); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java index 558ba51ee53..6b94d5d51a0 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java @@ -62,7 +62,8 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase { private Map> otherCollectionClients = new HashMap>(); private Map> oneInstanceCollectionClients = new HashMap>(); - private String oneInstanceCollection = "oneInstanceCollection";; + private String oneInstanceCollection = "oneInstanceCollection"; + private String oneInstanceCollection2 = "oneInstanceCollection2"; public BasicDistributedZkTest() { fixShardCount = true; @@ -247,12 +248,63 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase { testMultipleCollections(); testANewCollectionInOneInstance(); testSearchByCollectionName(); + testANewCollectionInOneInstanceWithManualShardAssignement(); // Thread.sleep(10000000000L); if (DEBUG) { super.printLayout(); } } + private void testANewCollectionInOneInstanceWithManualShardAssignement() throws Exception { + List collectionClients = new ArrayList(); + SolrServer client = clients.get(0); + oneInstanceCollectionClients.put(oneInstanceCollection , collectionClients); + String baseUrl = ((CommonsHttpSolrServer) client).getBaseURL(); + createCollection(oneInstanceCollection2, collectionClients, baseUrl, 1, "slice1"); + createCollection(oneInstanceCollection2, collectionClients, baseUrl, 2, "slice2"); + createCollection(oneInstanceCollection2, collectionClients, baseUrl, 3, "slice2"); + createCollection(oneInstanceCollection2, collectionClients, baseUrl, 4, "slice1"); + + SolrServer client1 = createNewSolrServer(oneInstanceCollection2 + "1", baseUrl); + SolrServer client2 = createNewSolrServer(oneInstanceCollection2 + "2", baseUrl); + SolrServer client3 = createNewSolrServer(oneInstanceCollection2 + "3", baseUrl); + SolrServer client4 = createNewSolrServer(oneInstanceCollection2 + "4", baseUrl); + + client2.add(getDoc(id, "1")); + client3.add(getDoc(id, "2")); + client4.add(getDoc(id, "3")); + + // no one should be recovering + waitForRecoveriesToFinish(oneInstanceCollection2, solrj.getZkStateReader(), false, true); + + assertAllActive(oneInstanceCollection2, solrj.getZkStateReader()); + + client1.commit(); + SolrQuery query = new SolrQuery("*:*"); + query.set("distrib", false); + long oneDocs = client1.query(query).getResults().getNumFound(); + long twoDocs = client2.query(query).getResults().getNumFound(); + long threeDocs = client3.query(query).getResults().getNumFound(); + long fourDocs = client4.query(query).getResults().getNumFound(); + + query.set("collection", oneInstanceCollection2); + query.set("distrib", true); + long allDocs = solrj.query(query).getResults().getNumFound(); + +// System.out.println("1:" + oneDocs); +// System.out.println("2:" + twoDocs); +// System.out.println("3:" + threeDocs); +// System.out.println("4:" + fourDocs); +// System.out.println("All Docs:" + allDocs); + + assertEquals(oneDocs, threeDocs); + assertEquals(twoDocs, fourDocs); + assertNotSame(oneDocs, twoDocs); + assertEquals(3, allDocs); + + + } + private void testSearchByCollectionName() throws SolrServerException { SolrServer client = clients.get(0); String baseUrl = ((CommonsHttpSolrServer) client).getBaseURL(); @@ -280,6 +332,9 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase { SolrServer client3 = createNewSolrServer(oneInstanceCollection + "3", baseUrl); SolrServer client4 = createNewSolrServer(oneInstanceCollection + "4", baseUrl); + waitForRecoveriesToFinish(oneInstanceCollection, solrj.getZkStateReader(), false); + assertAllActive(oneInstanceCollection, solrj.getZkStateReader()); + client2.add(getDoc(id, "1")); client3.add(getDoc(id, "2")); client4.add(getDoc(id, "3")); @@ -311,6 +366,12 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase { private void createCollection(String collection, List collectionClients, String baseUrl, int num) throws MalformedURLException, SolrServerException, IOException { + createCollection(collection, collectionClients, baseUrl, num, null); + } + + private void createCollection(String collection, + List collectionClients, String baseUrl, int num, String shardId) + throws MalformedURLException, SolrServerException, IOException { CommonsHttpSolrServer server = new CommonsHttpSolrServer( baseUrl); Create createCmd = new Create(); @@ -319,6 +380,7 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase { createCmd.setNumShards(2); createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator + collection + num); + createCmd.setShardId(shardId); server.request(createCmd); collectionClients.add(createNewSolrServer(collection, baseUrl)); } @@ -389,12 +451,13 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase { throws MalformedURLException, SolrServerException, IOException { List collectionClients = new ArrayList(); otherCollectionClients.put(collection, collectionClients); + int unique = 0; for (SolrServer client : clients) { CommonsHttpSolrServer server = new CommonsHttpSolrServer( ((CommonsHttpSolrServer) client).getBaseURL()); Create createCmd = new Create(); createCmd.setCoreName(collection); - createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator + collection); + createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator + collection + unique++); server.request(createCmd); collectionClients.add(createNewSolrServer(collection, ((CommonsHttpSolrServer) client).getBaseURL())); diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java index 816c5bcdb61..a93a7d73707 100644 --- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java @@ -65,8 +65,6 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase { private static final String SHARD2 = "shard2"; - protected static final String DEFAULT_COLLECTION = "collection1"; - private boolean printLayoutOnTearDown = false; String t1 = "a_t"; @@ -151,16 +149,12 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase { System .setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory"); System.setProperty("solrcloud.update.delay", "0"); - System.setProperty("enable.update.log", "true"); - System.setProperty("remove.version.field", "true"); } @AfterClass public static void afterClass() { System.clearProperty("solr.directoryFactory"); System.clearProperty("solrcloud.update.delay"); - System.clearProperty("enable.update.log"); - System.clearProperty("remove.version.field"); } public FullSolrCloudTest() { @@ -655,45 +649,7 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase { protected void waitForRecoveriesToFinish(boolean verbose) throws KeeperException, InterruptedException { - boolean cont = true; - int cnt = 0; - - while (cont) { - if (verbose) System.out.println("-"); - boolean sawLiveRecovering = false; - zkStateReader.updateCloudState(true); - CloudState cloudState = zkStateReader.getCloudState(); - Map slices = cloudState.getSlices(DEFAULT_COLLECTION); - for (Map.Entry entry : slices.entrySet()) { - Map shards = entry.getValue().getShards(); - for (Map.Entry shard : shards.entrySet()) { - if (verbose) System.out.println("rstate:" - + shard.getValue().get(ZkStateReader.STATE_PROP) - + " live:" - + cloudState.liveNodesContain(shard.getValue().get( - ZkStateReader.NODE_NAME_PROP))); - String state = shard.getValue().get(ZkStateReader.STATE_PROP); - if ((state.equals(ZkStateReader.RECOVERING) - || state.equals(ZkStateReader.SYNC)) - && cloudState.liveNodesContain(shard.getValue().get( - ZkStateReader.NODE_NAME_PROP))) { - sawLiveRecovering = true; - } - } - } - if (!sawLiveRecovering || cnt == 10) { - if (!sawLiveRecovering) { - if (verbose) System.out.println("no one is recoverying"); - } else { - if (verbose) System.out - .println("gave up waiting for recovery to finish.."); - } - cont = false; - } else { - Thread.sleep(2000); - } - cnt++; - } + super.waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, verbose); } private void brindDownShardIndexSomeDocsAndRecover() throws Exception, diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java new file mode 100644 index 00000000000..7f27c07abd3 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java @@ -0,0 +1,23 @@ +package org.apache.solr.common.cloud; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public interface SafeStopThread { + public void stop(); + public boolean isClosed(); +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java index 19b8e2caa18..3ae23009bdf 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java @@ -71,6 +71,11 @@ public class ZkCmdExecutor { Thread.currentThread().interrupt(); throw new InterruptedException(); } + if (Thread.currentThread() instanceof SafeStopThread) { + if (((SafeStopThread) Thread.currentThread()).isClosed()) { + throw new RuntimeException("Interrupted"); + } + } retryDelay(i); } }