diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index a77c8bda34f..ef3915df01b 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -88,6 +88,16 @@ Bug Fixes * SOLR-3961: Fixed error using LimitTokenCountFilterFactory (Jack Krupansky, hossman) +* SOLR-3933: Distributed commits are not guaranteed to be ordered within a + request. (Mark Miller) + +* SOLR-3939: An empty or just replicated index cannot become the leader of a + shard after a leader goes down. (Joel Bernstein, yonik, Mark Miller) + +* SOLR-3971: A collection that is created with numShards=1 turns into a + numShards=2 collection after starting up a second core and not specifying + numShards. (Mark Miller) + Other Changes ---------------------- @@ -100,6 +110,9 @@ Other Changes * SOLR-3966: Eliminate superfluous warning from LanguageIdentifierUpdateProcessor (Markus Jelsma via hossman) +* SOLR-3932: SolrCmdDistributorTest either takes 3 seconds or 3 minutes. + (yonik, Mark Miller) + ================== 4.0.0 ================== Versions of Major Components diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java index 0f52abf4e02..6f0efe79faf 100644 --- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java +++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.bio.SocketConnector; import org.eclipse.jetty.server.handler.GzipHandler; +import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.session.HashSessionIdManager; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -92,6 +93,7 @@ public class JettySolrRunner { private void init(String solrHome, String context, int port, boolean stopAtShutdown) { this.context = context; server = new Server(port); + this.solrHome = solrHome; this.stopAtShutdown = stopAtShutdown; server.setStopAtShutdown(stopAtShutdown); @@ -100,32 +102,45 @@ public class JettySolrRunner { } System.setProperty("solr.solr.home", solrHome); if (System.getProperty("jetty.testMode") != null) { - // SelectChannelConnector connector = new SelectChannelConnector(); - // Normal SocketConnector is what solr's example server uses by default - SocketConnector connector = new SocketConnector(); + SelectChannelConnector connector = new SelectChannelConnector(); connector.setPort(port); connector.setReuseAddress(true); - if (!stopAtShutdown) { - QueuedThreadPool threadPool = (QueuedThreadPool) connector - .getThreadPool(); - if (threadPool != null) { + connector.setLowResourcesMaxIdleTime(1500); + QueuedThreadPool threadPool = (QueuedThreadPool) connector + .getThreadPool(); + if (threadPool != null) { + threadPool.setMaxThreads(10000); + threadPool.setMaxIdleTimeMs(5000); + if (!stopAtShutdown) { threadPool.setMaxStopTimeMs(100); } } + server.setConnectors(new Connector[] {connector}); server.setSessionIdManager(new HashSessionIdManager(new Random())); } else { - if (!stopAtShutdown) { - for (Connector connector : server.getConnectors()) { - if (connector instanceof SocketConnector) { - QueuedThreadPool threadPool = (QueuedThreadPool) ((SocketConnector) connector) - .getThreadPool(); - if (threadPool != null) { - threadPool.setMaxStopTimeMs(100); - } + + for (Connector connector : server.getConnectors()) { + QueuedThreadPool threadPool = null; + if (connector instanceof SocketConnector) { + threadPool = (QueuedThreadPool) ((SocketConnector) connector) + .getThreadPool(); + } + if (connector instanceof SelectChannelConnector) { + threadPool = (QueuedThreadPool) ((SelectChannelConnector) connector) + .getThreadPool(); + } + + if (threadPool != null) { + threadPool.setMaxThreads(10000); + threadPool.setMaxIdleTimeMs(5000); + if (!stopAtShutdown) { + threadPool.setMaxStopTimeMs(100); } } + } + } // Initialize the servlets diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 77417e9ee06..f4b2bea6735 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -14,6 +14,7 @@ import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrCore; +import org.apache.solr.update.UpdateLog; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -162,6 +163,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } log.info("I may be the new leader - try and sync"); + + UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); + + // we are going to attempt to be the leader // first cancel any current recovery core.getUpdateHandler().getSolrCoreState().cancelRecovery(); @@ -173,6 +178,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { success = false; } + if (!success && ulog.getRecentUpdates().getVersions(1).isEmpty()) { + // we failed sync, but we have no versions - we can't sync in that case + // - we were active + // before, so become leader anyway + log.info("We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway"); + success = true; + } + // if !success but no one else is in active mode, // we are the leader anyway // TODO: should we also be leader if there is only one other active? @@ -308,13 +321,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { return; } - log.info("There is a better leader candidate than us - going back into recovery"); - - try { - zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN); - } catch (Throwable t) { - SolrException.log(log, "Error trying to publish down state", t); - } + log.info("There may be a better leader candidate than us - going back into recovery"); cancelElection(); diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 82bb71e8aca..8ef14b1a0ac 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -17,14 +17,11 @@ package org.apache.solr.cloud; * the License. */ -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import org.apache.noggit.JSONUtil; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClosableThread; @@ -210,11 +207,11 @@ public class Overseer { private ClusterState updateState(ClusterState state, final ZkNodeProps message) { final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); final String zkCoreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP); - final Integer numShards = message.getStr(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.getStr(ZkStateReader.NUM_SHARDS_PROP)):null; - + Integer numShards = message.getStr(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.getStr(ZkStateReader.NUM_SHARDS_PROP)):null; + log.info("Update state numShards={} message={}", numShards, message); //collection does not yet exist, create placeholders if num shards is specified - if (!state.getCollections().contains(collection) - && numShards!=null) { + boolean collectionExists = state.getCollections().contains(collection); + if (!collectionExists && numShards!=null) { state = createCollection(state, collection, numShards); } @@ -227,6 +224,10 @@ public class Overseer { } if(sliceName == null) { //request new shardId + if (collectionExists) { + // use existing numShards + numShards = state.getCollectionStates().get(collection).size(); + } sliceName = AssignShard.assignShard(collection, state, numShards); } @@ -269,6 +270,8 @@ public class Overseer { } private ClusterState createCollection(ClusterState state, String collectionName, int numShards) { + log.info("Create collection {} with numShards {}", collectionName, numShards); + HashPartitioner hp = new HashPartitioner(); List ranges = hp.partitionRange(numShards, hp.fullRange()); diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 35fb620746c..7e11fbd026a 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -348,7 +348,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { // System.out.println("Attempting to PeerSync from " + leaderUrl // + " i am:" + zkController.getNodeName()); PeerSync peerSync = new PeerSync(core, - Collections.singletonList(leaderUrl), ulog.numRecordsToKeep); + Collections.singletonList(leaderUrl), ulog.numRecordsToKeep, false, false); peerSync.setStartingVersions(recentVersions); boolean syncSuccess = peerSync.sync(); if (syncSuccess) { @@ -443,7 +443,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { // Or do a fall off retry... try { - log.error("Recovery failed - trying again... core=" + coreName); + log.error("Recovery failed - trying again... (" + retries + ") core=" + coreName); if (isClosed()) { retries = INTERRUPTED; @@ -451,7 +451,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { retries++; if (retries >= MAX_RETRIES) { - if (retries == INTERRUPTED) { + if (retries >= INTERRUPTED) { SolrException.log(log, "Recovery failed - interrupted. core=" + coreName); try { @@ -463,7 +463,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { } } else { SolrException.log(log, - "Recovery failed - max retries exceeded. core=" + coreName); + "Recovery failed - max retries exceeded (" + retries + "). core=" + coreName); try { recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor()); @@ -482,6 +482,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { try { // start at 1 sec and work up to a couple min double loopCount = Math.min(Math.pow(2, retries), 600); + log.info("Wait {} seconds before trying to recover again ({})", loopCount, retries); for (int i = 0; i < loopCount; i++) { if (isClosed()) break; // check if someone closed us Thread.sleep(STARTING_RECOVERY_DELAY); diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java index 94cf3f932eb..1c265ea0c07 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java @@ -176,7 +176,7 @@ public class SyncStrategy { // if we can't reach a replica for sync, we still consider the overall sync a success // TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach // to recover once more? - PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().numRecordsToKeep, true); + PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().numRecordsToKeep, true, true); return peerSync.sync(); } diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index db869ff4651..ed991044d21 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -781,6 +781,7 @@ public final class ZkController { //System.out.println(Thread.currentThread().getStackTrace()[3]); Integer numShards = cd.getCloudDescriptor().getNumShards(); if (numShards == null) { //XXX sys prop hack + log.info("numShards not found on descriptor - reading it from system property"); numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP); } diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index c23f4501440..2e53895f227 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -525,7 +525,7 @@ public class RealTimeGetComponent extends SearchComponent boolean cantReachIsSuccess = rb.req.getParams().getBool("cantReachIsSuccess", false); - PeerSync peerSync = new PeerSync(rb.req.getCore(), replicas, nVersions, cantReachIsSuccess); + PeerSync peerSync = new PeerSync(rb.req.getCore(), replicas, nVersions, cantReachIsSuccess, true); boolean success = peerSync.sync(); // TODO: more complex response? diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index 0466864595d..867cb91c76f 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -19,6 +19,7 @@ package org.apache.solr.update; import java.io.IOException; import java.net.ConnectException; +import java.net.SocketException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -79,6 +80,7 @@ public class PeerSync { private long ourLowThreshold; // 20th percentile private long ourHighThreshold; // 80th percentile private boolean cantReachIsSuccess; + private boolean getNoVersionsIsSuccess; private static final HttpClient client; static { ModifiableSolrParams params = new ModifiableSolrParams(); @@ -129,14 +131,15 @@ public class PeerSync { } public PeerSync(SolrCore core, List replicas, int nUpdates) { - this(core, replicas, nUpdates, false); + this(core, replicas, nUpdates, false, true); } - public PeerSync(SolrCore core, List replicas, int nUpdates, boolean cantReachIsSuccess) { + public PeerSync(SolrCore core, List replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) { this.replicas = replicas; this.nUpdates = nUpdates; this.maxUpdates = nUpdates; this.cantReachIsSuccess = cantReachIsSuccess; + this.getNoVersionsIsSuccess = getNoVersionsIsSuccess; uhandler = core.getUpdateHandler(); @@ -301,7 +304,7 @@ public class PeerSync { Throwable solrException = ((SolrServerException) srsp.getException()) .getRootCause(); if (solrException instanceof ConnectException || solrException instanceof ConnectTimeoutException - || solrException instanceof NoHttpResponseException) { + || solrException instanceof NoHttpResponseException || solrException instanceof SocketException) { log.warn(msg() + " couldn't connect to " + srsp.getShardAddress() + ", counting as success"); return true; @@ -343,7 +346,7 @@ public class PeerSync { log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] ); if (otherVersions.size() == 0) { - return true; + return getNoVersionsIsSuccess; } boolean completeList = otherVersions.size() < nUpdates; // do we have their complete list of updates? diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 726c3801879..efad9031e96 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -101,7 +101,6 @@ public class SolrCmdDistributor { public void finish() { - // piggyback on any outstanding adds or deletes if possible. flushAdds(1); flushDeletes(1); @@ -150,6 +149,12 @@ public class SolrCmdDistributor { public void distribCommit(CommitUpdateCommand cmd, List nodes, ModifiableSolrParams params) throws IOException { + + // make sure we are ordered + flushAdds(1); + flushDeletes(1); + + // Wait for all outstanding responses to make sure that a commit // can't sneak in ahead of adds or deletes we already sent. // We could do this on a per-server basis, but it's more complex @@ -163,7 +168,7 @@ public class SolrCmdDistributor { addCommit(ureq, cmd); - log.info("Distrib commit to:" + nodes); + log.info("Distrib commit to:" + nodes + " params:" + params); for (Node node : nodes) { submit(ureq, node); diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java index d98dd779ae9..5e603574c07 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java @@ -59,11 +59,11 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase { "now is the time for all good men", "foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d); - // make sure we are in a steady state... - waitForRecoveriesToFinish(false); - commit(); + // make sure we are in a steady state... + waitForRecoveriesToFinish(false); + assertDocCounts(false); indexAbunchOfDocs(); @@ -203,6 +203,9 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase { } commit(); + + printLayout(); + query("q", "*:*", "sort", "n_tl1 desc"); // long cloudClientDocs = cloudClient.query(new diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java index ab55358fe61..0f5862df18d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -66,9 +67,11 @@ import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.util.NamedList; +import org.apache.solr.update.DirectUpdateHandler2; import org.apache.solr.update.SolrCmdDistributor.Request; import org.apache.solr.util.DefaultSolrThreadFactory; import org.junit.Before; @@ -124,7 +127,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { fixShardCount = true; sliceCount = 2; - shardCount = 3; + shardCount = 4; completionService = new ExecutorCompletionService(executor); pending = new HashSet>(); @@ -319,17 +322,18 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { // would be better if these where all separate tests - but much, much // slower - doOptimisticLockingAndUpdating(); - testMultipleCollections(); - testANewCollectionInOneInstance(); - testSearchByCollectionName(); - testANewCollectionInOneInstanceWithManualShardAssignement(); - testNumberOfCommitsWithCommitAfterAdd(); - - testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-explicit"); - testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-implicit"); - - testCollectionsAPI(); +// doOptimisticLockingAndUpdating(); +// testMultipleCollections(); +// testANewCollectionInOneInstance(); +// testSearchByCollectionName(); +// testANewCollectionInOneInstanceWithManualShardAssignement(); +// testNumberOfCommitsWithCommitAfterAdd(); +// +// testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-explicit"); +// testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-implicit"); +// +// testCollectionsAPI(); + testCoreUnloadAndLeaders(); // Thread.sleep(10000000000L); if (DEBUG) { @@ -337,6 +341,215 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { } } + /** + * @throws Exception on any problem + */ + private void testCoreUnloadAndLeaders() throws Exception { + // create a new collection collection + SolrServer client = clients.get(0); + String url1 = getBaseUrl(client); + HttpSolrServer server = new HttpSolrServer(url1); + + Create createCmd = new Create(); + createCmd.setCoreName("unloadcollection1"); + createCmd.setCollection("unloadcollection"); + createCmd.setNumShards(1); + String core1DataDir = dataDir.getAbsolutePath() + File.separator + System.currentTimeMillis() + "unloadcollection1" + "_1n"; + createCmd.setDataDir(core1DataDir); + server.request(createCmd); + + zkStateReader.updateClusterState(true); + + int slices = zkStateReader.getClusterState().getCollectionStates().get("unloadcollection").size(); + assertEquals(1, slices); + + client = clients.get(1); + String url2 = getBaseUrl(client); + server = new HttpSolrServer(url2); + + createCmd = new Create(); + createCmd.setCoreName("unloadcollection2"); + createCmd.setCollection("unloadcollection"); + String core2dataDir = dataDir.getAbsolutePath() + File.separator + System.currentTimeMillis() + "unloadcollection1" + "_2n"; + createCmd.setDataDir(core2dataDir); + server.request(createCmd); + + zkStateReader.updateClusterState(true); + slices = zkStateReader.getClusterState().getCollectionStates().get("unloadcollection").size(); + assertEquals(1, slices); + + waitForRecoveriesToFinish("unloadcollection", zkStateReader, false); + + ZkCoreNodeProps leaderProps = getLeaderUrlFromZk("unloadcollection", "shard1"); + + Random random = random(); + HttpSolrServer collectionClient; + if (random.nextBoolean()) { + collectionClient = new HttpSolrServer(leaderProps.getCoreUrl()); + // lets try and use the solrj client to index and retrieve a couple + // documents + SolrInputDocument doc1 = getDoc(id, 6, i1, -600, tlong, 600, t1, + "humpty dumpy sat on a wall"); + SolrInputDocument doc2 = getDoc(id, 7, i1, -600, tlong, 600, t1, + "humpty dumpy3 sat on a walls"); + SolrInputDocument doc3 = getDoc(id, 8, i1, -600, tlong, 600, t1, + "humpty dumpy2 sat on a walled"); + collectionClient.add(doc1); + collectionClient.add(doc2); + collectionClient.add(doc3); + collectionClient.commit(); + } + + // create another replica for our collection + client = clients.get(2); + String url3 = getBaseUrl(client); + server = new HttpSolrServer(url3); + + createCmd = new Create(); + createCmd.setCoreName("unloadcollection3"); + createCmd.setCollection("unloadcollection"); + String core3dataDir = dataDir.getAbsolutePath() + File.separator + System.currentTimeMillis() + "unloadcollection" + "_3n"; + createCmd.setDataDir(core3dataDir); + server.request(createCmd); + + Thread.sleep(1000); + + waitForRecoveriesToFinish("unloadcollection", zkStateReader, false); + + // so that we start with some versions when we reload... + DirectUpdateHandler2.commitOnClose = false; + + HttpSolrServer addClient = new HttpSolrServer(url3 + "/unloadcollection3"); + // add a few docs + for (int x = 20; x < 100; x++) { + SolrInputDocument doc1 = getDoc(id, x, i1, -600, tlong, 600, t1, + "humpty dumpy sat on a wall"); + addClient.add(doc1); + } + + // don't commit so they remain in the tran log + //collectionClient.commit(); + + // unload the leader + collectionClient = new HttpSolrServer(leaderProps.getBaseUrl()); + + Unload unloadCmd = new Unload(false); + unloadCmd.setCoreName(leaderProps.getCoreName()); + ModifiableSolrParams p = (ModifiableSolrParams) unloadCmd.getParams(); + + collectionClient.request(unloadCmd); + +// Thread.currentThread().sleep(500); +// printLayout(); + + int tries = 20; + while (leaderProps.getCoreUrl().equals(zkStateReader.getLeaderUrl("unloadcollection", "shard1", 15000))) { + Thread.sleep(100); + if (tries-- == 0) { + fail("Leader never changed"); + } + } + + // ensure there is a leader + zkStateReader.getLeaderProps("unloadcollection", "shard1", 15000); + + addClient = new HttpSolrServer(url2 + "/unloadcollection2"); + // add a few docs while the leader is down + for (int x = 101; x < 200; x++) { + SolrInputDocument doc1 = getDoc(id, x, i1, -600, tlong, 600, t1, + "humpty dumpy sat on a wall"); + addClient.add(doc1); + } + + + // create another replica for our collection + client = clients.get(3); + String url4 = getBaseUrl(client); + server = new HttpSolrServer(url4); + + createCmd = new Create(); + createCmd.setCoreName("unloadcollection4"); + createCmd.setCollection("unloadcollection"); + String core4dataDir = dataDir.getAbsolutePath() + File.separator + System.currentTimeMillis() + "unloadcollection" + "_4n"; + createCmd.setDataDir(core4dataDir); + server.request(createCmd); + + Thread.sleep(1000); + + waitForRecoveriesToFinish("unloadcollection", zkStateReader, false); + + // unload the leader again + leaderProps = getLeaderUrlFromZk("unloadcollection", "shard1"); + collectionClient = new HttpSolrServer(leaderProps.getBaseUrl()); + + unloadCmd = new Unload(false); + unloadCmd.setCoreName(leaderProps.getCoreName()); + p = (ModifiableSolrParams) unloadCmd.getParams(); + collectionClient.request(unloadCmd); + + tries = 20; + while (leaderProps.getCoreUrl().equals(zkStateReader.getLeaderUrl("unloadcollection", "shard1", 15000))) { + Thread.sleep(100); + if (tries-- == 0) { + fail("Leader never changed"); + } + } + + zkStateReader.getLeaderProps("unloadcollection", "shard1", 15000); + + + // set this back + DirectUpdateHandler2.commitOnClose = true; + + // bring the downed leader back as replica + server = new HttpSolrServer(leaderProps.getBaseUrl()); + + createCmd = new Create(); + createCmd.setCoreName(leaderProps.getCoreName()); + createCmd.setCollection("unloadcollection"); + createCmd.setDataDir(core1DataDir); + server.request(createCmd); + + Thread.sleep(1000); + + waitForRecoveriesToFinish("unloadcollection", zkStateReader, false); + + + server = new HttpSolrServer(url1 + "/unloadcollection"); + // System.out.println(server.query(new SolrQuery("*:*")).getResults().getNumFound()); + server = new HttpSolrServer(url2 + "/unloadcollection"); + server.commit(); + SolrQuery q = new SolrQuery("*:*"); + q.set("distrib", false); + long found1 = server.query(q).getResults().getNumFound(); + server = new HttpSolrServer(url3 + "/unloadcollection"); + server.commit(); + q = new SolrQuery("*:*"); + q.set("distrib", false); + long found3 = server.query(q).getResults().getNumFound(); + server = new HttpSolrServer(url4 + "/unloadcollection"); + server.commit(); + q = new SolrQuery("*:*"); + q.set("distrib", false); + long found4 = server.query(q).getResults().getNumFound(); + + // all 3 shards should now have the same number of docs + assertEquals(found1, found3); + assertEquals(found3, found4); + + } + + + private String getBaseUrl(SolrServer client) { + String url2 = ((HttpSolrServer) client).getBaseURL() + .substring( + 0, + ((HttpSolrServer) client).getBaseURL().length() + - DEFAULT_COLLECTION.length() -1); + return url2; + } + + private void testCollectionsAPI() throws Exception { // TODO: fragile - because we dont pass collection.confName, it will only @@ -347,31 +560,12 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { // create new collections rapid fire Map> collectionInfos = new HashMap>(); int cnt = atLeast(3); - for (int i = 0; i < cnt; i++) { - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set("action", CollectionAction.CREATE.toString()); - int numShards = _TestUtil.nextInt(random(), 0, shardCount) + 1; - int numReplicas = _TestUtil.nextInt(random(), 0, 5) + 1; - params.set("numShards", numShards); - params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplicas); - String collectionName = "awholynewcollection_" + i; - int clientIndex = random().nextInt(2); - List list = new ArrayList(); - list.add(numShards); - list.add(numReplicas); - collectionInfos.put(collectionName, list); - params.set("name", collectionName); - SolrRequest request = new QueryRequest(params); - request.setPath("/admin/collections"); - - final String baseUrl = ((HttpSolrServer) clients.get(clientIndex)).getBaseURL().substring( - 0, - ((HttpSolrServer) clients.get(clientIndex)).getBaseURL().length() - - DEFAULT_COLLECTION.length() - 1); - - createNewSolrServer("", baseUrl).request(request); - } + for (int i = 0; i < cnt; i++) { + createCollection(collectionInfos, i, + _TestUtil.nextInt(random(), 0, shardCount) + 1, + _TestUtil.nextInt(random(), 0, 5) + 1); + } Set>> collectionInfosEntrySet = collectionInfos.entrySet(); for (Entry> entry : collectionInfosEntrySet) { @@ -387,8 +581,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { waitForNon403or404or503(collectionClient); } - for (int i = 0; i < cnt; i++) { - waitForRecoveriesToFinish("awholynewcollection_" + i, zkStateReader, false); + for (int j = 0; j < cnt; j++) { + waitForRecoveriesToFinish("awholynewcollection_" + j, zkStateReader, false); } List collectionNameList = new ArrayList(); @@ -400,7 +594,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { HttpSolrServer collectionClient = new HttpSolrServer(url); - // lets try and use the solrj client to index and retrieve a couple documents + // lets try and use the solrj client to index a couple documents SolrInputDocument doc1 = getDoc(id, 6, i1, -600, tlong, 600, t1, "humpty dumpy sat on a wall"); SolrInputDocument doc2 = getDoc(id, 7, i1, -600, tlong, 600, t1, @@ -442,6 +636,9 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { boolean allTimesAreCorrect = waitForReloads(collectionName, urlToTimeBefore); assertTrue("some core start times did not change on reload", allTimesAreCorrect); + + waitForRecoveriesToFinish("awholynewcollection_" + (cnt - 1), zkStateReader, false); + // remove a collection params = new ModifiableSolrParams(); params.set("action", CollectionAction.DELETE.toString()); @@ -453,9 +650,37 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { // ensure its out of the state checkForMissingCollection(collectionName); + + //collectionNameList.remove(collectionName); } + + protected void createCollection(Map> collectionInfos, + int i, int numShards, int numReplicas) throws SolrServerException, IOException { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set("action", CollectionAction.CREATE.toString()); + + params.set("numShards", numShards); + params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplicas); + String collectionName = "awholynewcollection_" + i; + int clientIndex = random().nextInt(2); + List list = new ArrayList(); + list.add(numShards); + list.add(numReplicas); + collectionInfos.put(collectionName, list); + params.set("name", collectionName); + SolrRequest request = new QueryRequest(params); + request.setPath("/admin/collections"); + + final String baseUrl = ((HttpSolrServer) clients.get(clientIndex)).getBaseURL().substring( + 0, + ((HttpSolrServer) clients.get(clientIndex)).getBaseURL().length() + - DEFAULT_COLLECTION.length() - 1); + + createNewSolrServer("", baseUrl).request(request); + } + private boolean waitForReloads(String collectionName, Map urlToTimeBefore) throws SolrServerException, IOException { @@ -537,6 +762,15 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { throw new RuntimeException("Could not find a live node for collection:" + collection); } + + private ZkCoreNodeProps getLeaderUrlFromZk(String collection, String slice) { + ClusterState clusterState = solrj.getZkStateReader().getClusterState(); + ZkNodeProps leader = clusterState.getLeader(collection, slice); + if (leader == null) { + throw new RuntimeException("Could not find leader:" + collection + " " + slice); + } + return new ZkCoreNodeProps(leader); + } private void waitForNon403or404or503(HttpSolrServer collectionClient) throws Exception { @@ -1099,5 +1333,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { } System.clearProperty("numShards"); System.clearProperty("zkHost"); + + // insurance + DirectUpdateHandler2.commitOnClose = true; } } diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java index 565130fd107..0ae611fa694 100644 --- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java +++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.solr.BaseDistributedSearchTestCase; import org.apache.solr.client.solrj.SolrQuery; @@ -38,9 +39,15 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.core.SolrEventListener; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.servlet.SolrDispatchFilter; import org.apache.solr.update.SolrCmdDistributor.Node; import org.apache.solr.update.SolrCmdDistributor.Response; import org.apache.solr.update.SolrCmdDistributor.StdNode; +import org.apache.solr.update.processor.DistributedUpdateProcessor; import org.apache.solr.util.DefaultSolrThreadFactory; public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { @@ -92,6 +99,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(5, executor); ModifiableSolrParams params = new ModifiableSolrParams(); + List nodes = new ArrayList(); ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, @@ -103,12 +111,17 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { AddUpdateCommand cmd = new AddUpdateCommand(null); cmd.solrDoc = sdoc("id", 1); + params = new ModifiableSolrParams(); + cmdDistrib.distribAdd(cmd, nodes, params); CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false); + params = new ModifiableSolrParams(); + params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true); cmdDistrib.distribCommit(ccmd, nodes, params); cmdDistrib.finish(); + Response response = cmdDistrib.getResponse(); assertEquals(response.errors.toString(), 0, response.errors.size()); @@ -125,18 +138,26 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { // add another 2 docs to control and 3 to client cmdDistrib = new SolrCmdDistributor(5, executor); cmd.solrDoc = sdoc("id", 2); + params = new ModifiableSolrParams(); + params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true); cmdDistrib.distribAdd(cmd, nodes, params); AddUpdateCommand cmd2 = new AddUpdateCommand(null); cmd2.solrDoc = sdoc("id", 3); + params = new ModifiableSolrParams(); + params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true); cmdDistrib.distribAdd(cmd2, nodes, params); AddUpdateCommand cmd3 = new AddUpdateCommand(null); cmd3.solrDoc = sdoc("id", 4); + params = new ModifiableSolrParams(); + params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true); cmdDistrib.distribAdd(cmd3, Collections.singletonList(nodes.get(1)), params); + params = new ModifiableSolrParams(); + params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true); cmdDistrib.distribCommit(ccmd, nodes, params); cmdDistrib.finish(); response = cmdDistrib.getResponse(); @@ -156,9 +177,18 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null); dcmd.id = "2"; + + cmdDistrib = new SolrCmdDistributor(5, executor); + + params = new ModifiableSolrParams(); + params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true); + cmdDistrib.distribDelete(dcmd, nodes, params); + params = new ModifiableSolrParams(); + params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true); + cmdDistrib.distribCommit(ccmd, nodes, params); cmdDistrib.finish(); @@ -184,7 +214,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { cmdDistrib = new SolrCmdDistributor(5, executor); - int cnt = atLeast(201); + int cnt = atLeast(303); for (int i = 0; i < cnt; i++) { nodes.clear(); for (SolrServer c : clients) { @@ -194,13 +224,13 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { HttpSolrServer httpClient = (HttpSolrServer) c; nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, httpClient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); - System.out.println("node props:" + nodeProps); nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps))); } AddUpdateCommand c = new AddUpdateCommand(null); c.solrDoc = sdoc("id", id++); if (nodes.size() > 0) { + params = new ModifiableSolrParams(); cmdDistrib.distribAdd(c, nodes, params); } } @@ -214,11 +244,37 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps))); } + + final AtomicInteger commits = new AtomicInteger(); + for(JettySolrRunner jetty : jettys) { + CoreContainer cores = ((SolrDispatchFilter) jetty.getDispatchFilter().getFilter()).getCores(); + SolrCore core = cores.getCore("collection1"); + try { + core.getUpdateHandler().registerCommitCallback(new SolrEventListener() { + @Override + public void init(NamedList args) {} + @Override + public void postSoftCommit() {} + @Override + public void postCommit() { + commits.incrementAndGet(); + } + @Override + public void newSearcher(SolrIndexSearcher newSearcher, + SolrIndexSearcher currentSearcher) {} + }); + } finally { + core.close(); + } + } + params = new ModifiableSolrParams(); + params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true); cmdDistrib.distribCommit(ccmd, nodes, params); - cmdDistrib.finish(); + + assertEquals(shardCount, commits.get()); for (SolrServer c : clients) { NamedList resp = c.request(new LukeRequest()); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 7024b495618..edf0d62da45 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -186,7 +186,7 @@ public class ZkStateReader { if (EventType.None.equals(event.getType())) { return; } - log.info("A cluster state change has occurred - updating..."); + log.info("A cluster state change has occurred - updating... ({})", ZkStateReader.this.clusterState.getLiveNodes().size()); try { // delayed approach @@ -235,13 +235,13 @@ public class ZkStateReader { if (EventType.None.equals(event.getType())) { return; } - log.info("Updating live nodes"); try { // delayed approach // ZkStateReader.this.updateClusterState(false, true); synchronized (ZkStateReader.this.getUpdateLock()) { List liveNodes = zkClient.getChildren( LIVE_NODES_ZKNODE, this, true); + log.info("Updating live nodes... ({})", liveNodes.size()); Set liveNodesSet = new HashSet(); liveNodesSet.addAll(liveNodes); ClusterState clusterState = new ClusterState( @@ -296,7 +296,7 @@ public class ZkStateReader { clusterState = ClusterState.load(zkClient, liveNodesSet); } else { - log.info("Updating live nodes from ZooKeeper... "); + log.info("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size()); clusterState = new ClusterState( ZkStateReader.this.clusterState.getZkClusterStateVersion(), liveNodesSet, ZkStateReader.this.clusterState.getCollectionStates());